diff --git a/src/Internal/Socket.php b/src/Internal/Socket.php old mode 100644 new mode 100755 index a3ef34f..acb2ba8 --- a/src/Internal/Socket.php +++ b/src/Internal/Socket.php @@ -16,6 +16,7 @@ use LibDNS\Records\Question; use Revolt\EventLoop; use function Amp\now; +use function Amp\weakClosure; /** @internal */ abstract class Socket @@ -31,9 +32,9 @@ abstract public static function connect(string $uri): self; /** * Contains already sent queries with no response yet. For UDP this is exactly zero or one item. * - * @var array + * @var \ArrayObject */ - private array $pending = []; + private readonly \ArrayObject $pending; private readonly MessageFactory $messageFactory; @@ -42,17 +43,20 @@ abstract public static function connect(string $uri): self; private bool $receiving = false; - /** @var EventLoop\Suspension[] Queued requests if the number of concurrent requests is too large. */ - private array $queue = []; + /** @var \SplQueue Queued requests if the number of concurrent requests is too large. */ + private readonly \SplQueue $queue; /** * @param resource $socket */ protected function __construct($socket) { + $this->pending = new \ArrayObject(); + $this->queue = new \SplQueue(); + $this->input = new ReadableResourceStream($socket); $this->output = new WritableResourceStream($socket); - $this->messageFactory = new MessageFactory; + $this->messageFactory = new MessageFactory(); $this->lastActivity = now(); } @@ -60,14 +64,14 @@ private function fetch(): void { EventLoop::queue(function (): void { try { - $this->handleResolution(null, $this->receive()); + $this->handleResolution(message: $this->receive()); } catch (\Throwable $exception) { - $this->handleResolution($exception); + $this->handleResolution(exception: $exception); } }); } - private function handleResolution(?\Throwable $exception, ?Message $message = null): void + private function handleResolution(?\Throwable $exception = null, ?Message $message = null): void { $this->lastActivity = now(); $this->receiving = false; @@ -82,14 +86,15 @@ private function handleResolution(?\Throwable $exception, ?Message $message = nu // Ignore duplicate and invalid responses. if (isset($this->pending[$id]) && $this->matchesQuestion($message, $this->pending[$id]->question)) { - /** @var DeferredFuture $deferred */ - $deferred = $this->pending[$id]->deferred; + $pending = $this->pending[$id]; unset($this->pending[$id]); - $deferred->complete(static fn () => $message); + + $pending->deferred->complete(static fn () => $message); + $pending->deferred = null; } /** @psalm-suppress RedundantCondition */ - if (!$this->pending) { + if (!$this->pending->count()) { $this->input->unreference(); } elseif (!$this->receiving) { $this->input->reference(); @@ -112,9 +117,9 @@ final public function ask(Question $question, float $timeout, ?Cancellation $can { $this->lastActivity = now(); - if (\count($this->pending) > self::MAX_CONCURRENT_REQUESTS) { + if ($this->pending->count() > self::MAX_CONCURRENT_REQUESTS) { $suspension = EventLoop::getSuspension(); - $this->queue[] = $suspension; + $this->queue->enqueue($suspension); $suspension->suspend(); } @@ -125,23 +130,30 @@ final public function ask(Question $question, float $timeout, ?Cancellation $can /** @var DeferredFuture<\Closure():Message> $deferred */ $deferred = new DeferredFuture; - $this->pending[$id] = new class($deferred, $question, $timeout) { + $this->pending[$id] = new class($this->pending, $id, $deferred, $question, $timeout) { private readonly string $callbackId; + public ?DeferredFuture $deferred; + public function __construct( - public readonly DeferredFuture $deferred, + \ArrayObject $pending, + int $id, + DeferredFuture $deferred, public readonly Question $question, float $timeout, ) { + $this->deferred = $deferred; + $this->callbackId = EventLoop::unreference(EventLoop::delay( $timeout, - static function () use ($deferred, $timeout): void { - if (!$deferred->isComplete()) { - $deferred->complete(static fn () => throw new TimeoutException( - "Didn't receive a response within {$timeout} seconds." - )); - } - }, + weakClosure(function () use ($id, $pending, $timeout): void { + $this->deferred?->complete(static fn () => throw new TimeoutException( + "Didn't receive a response within {$timeout} seconds." + )); + $this->deferred = null; + + unset($pending[$id]); + }), )); } @@ -172,12 +184,12 @@ public function __destruct() $callback = $deferred->getFuture()->await($cancellation); } finally { /** @psalm-suppress TypeDoesNotContainType */ - if (!$this->pending) { + if (!$this->pending->count()) { $this->input->unreference(); } - if ($this->queue) { - $suspension = \array_shift($this->queue); + if (!$this->queue->isEmpty()) { + $suspension = $this->queue->dequeue(); $suspension->resume(); } } @@ -233,20 +245,15 @@ private function error(\Throwable $exception): void $exception = new DnsException($message, 0, $exception); } - $pending = $this->pending; - $this->pending = []; + foreach ($this->pending as $id => $pendingQuestion) { + $pendingQuestion->deferred?->error($exception); + $pendingQuestion->deferred = null; - foreach ($pending as $pendingQuestion) { - /** @var DeferredFuture $deferred */ - $deferred = $pendingQuestion->deferred; - $deferred->error($exception); + unset($this->pending[$id]); } - $queue = $this->queue; - $this->queue = []; - - foreach ($queue as $suspension) { - $suspension->throw($exception); + while (!$this->queue->isEmpty()) { + $this->queue->dequeue()->throw($exception); } }