Skip to content

Commit

Permalink
Fix double resolution on timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 11, 2022
1 parent fa679cb commit 8fa7842
Showing 1 changed file with 44 additions and 37 deletions.
81 changes: 44 additions & 37 deletions src/Internal/Socket.php
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use LibDNS\Records\Question;
use Revolt\EventLoop;
use function Amp\now;
use function Amp\weakClosure;

/** @internal */
abstract class Socket
Expand All @@ -31,9 +32,9 @@ abstract public static function connect(string $uri): self;
/**
* Contains already sent queries with no response yet. For UDP this is exactly zero or one item.
*
* @var array<int, object>
* @var \ArrayObject<int, object{deferred: DeferredFuture|null, question: Question}>
*/
private array $pending = [];
private readonly \ArrayObject $pending;

private readonly MessageFactory $messageFactory;

Expand All @@ -42,32 +43,35 @@ abstract public static function connect(string $uri): self;

private bool $receiving = false;

/** @var EventLoop\Suspension[] Queued requests if the number of concurrent requests is too large. */
private array $queue = [];
/** @var \SplQueue<EventLoop\Suspension> Queued requests if the number of concurrent requests is too large. */
private readonly \SplQueue $queue;

/**
* @param resource $socket
*/
protected function __construct($socket)
{
$this->pending = new \ArrayObject();
$this->queue = new \SplQueue();

$this->input = new ReadableResourceStream($socket);
$this->output = new WritableResourceStream($socket);
$this->messageFactory = new MessageFactory;
$this->messageFactory = new MessageFactory();
$this->lastActivity = now();
}

private function fetch(): void
{
EventLoop::queue(function (): void {
try {
$this->handleResolution(null, $this->receive());
$this->handleResolution(message: $this->receive());
} catch (\Throwable $exception) {
$this->handleResolution($exception);
$this->handleResolution(exception: $exception);
}
});
}

private function handleResolution(?\Throwable $exception, ?Message $message = null): void
private function handleResolution(?\Throwable $exception = null, ?Message $message = null): void
{
$this->lastActivity = now();
$this->receiving = false;
Expand All @@ -82,14 +86,15 @@ private function handleResolution(?\Throwable $exception, ?Message $message = nu

// Ignore duplicate and invalid responses.
if (isset($this->pending[$id]) && $this->matchesQuestion($message, $this->pending[$id]->question)) {
/** @var DeferredFuture $deferred */
$deferred = $this->pending[$id]->deferred;
$pending = $this->pending[$id];
unset($this->pending[$id]);
$deferred->complete(static fn () => $message);

$pending->deferred->complete(static fn () => $message);
$pending->deferred = null;
}

/** @psalm-suppress RedundantCondition */
if (!$this->pending) {
if (!$this->pending->count()) {
$this->input->unreference();
} elseif (!$this->receiving) {
$this->input->reference();
Expand All @@ -112,9 +117,9 @@ final public function ask(Question $question, float $timeout, ?Cancellation $can
{
$this->lastActivity = now();

if (\count($this->pending) > self::MAX_CONCURRENT_REQUESTS) {
if ($this->pending->count() > self::MAX_CONCURRENT_REQUESTS) {
$suspension = EventLoop::getSuspension();
$this->queue[] = $suspension;
$this->queue->enqueue($suspension);
$suspension->suspend();
}

Expand All @@ -125,23 +130,30 @@ final public function ask(Question $question, float $timeout, ?Cancellation $can
/** @var DeferredFuture<\Closure():Message> $deferred */
$deferred = new DeferredFuture;

$this->pending[$id] = new class($deferred, $question, $timeout) {
$this->pending[$id] = new class($this->pending, $id, $deferred, $question, $timeout) {
private readonly string $callbackId;

public ?DeferredFuture $deferred;

public function __construct(
public readonly DeferredFuture $deferred,
\ArrayObject $pending,
int $id,
DeferredFuture $deferred,
public readonly Question $question,
float $timeout,
) {
$this->deferred = $deferred;

$this->callbackId = EventLoop::unreference(EventLoop::delay(
$timeout,
static function () use ($deferred, $timeout): void {
if (!$deferred->isComplete()) {
$deferred->complete(static fn () => throw new TimeoutException(
"Didn't receive a response within {$timeout} seconds."
));
}
},
weakClosure(function () use ($id, $pending, $timeout): void {
$this->deferred?->complete(static fn () => throw new TimeoutException(
"Didn't receive a response within {$timeout} seconds."
));
$this->deferred = null;

unset($pending[$id]);
}),
));
}

Expand Down Expand Up @@ -172,12 +184,12 @@ public function __destruct()
$callback = $deferred->getFuture()->await($cancellation);
} finally {
/** @psalm-suppress TypeDoesNotContainType */
if (!$this->pending) {
if (!$this->pending->count()) {
$this->input->unreference();
}

if ($this->queue) {
$suspension = \array_shift($this->queue);
if (!$this->queue->isEmpty()) {
$suspension = $this->queue->dequeue();
$suspension->resume();
}
}
Expand Down Expand Up @@ -233,20 +245,15 @@ private function error(\Throwable $exception): void
$exception = new DnsException($message, 0, $exception);
}

$pending = $this->pending;
$this->pending = [];
foreach ($this->pending as $id => $pendingQuestion) {
$pendingQuestion->deferred?->error($exception);
$pendingQuestion->deferred = null;

foreach ($pending as $pendingQuestion) {
/** @var DeferredFuture $deferred */
$deferred = $pendingQuestion->deferred;
$deferred->error($exception);
unset($this->pending[$id]);
}

$queue = $this->queue;
$this->queue = [];

foreach ($queue as $suspension) {
$suspension->throw($exception);
while (!$this->queue->isEmpty()) {
$this->queue->dequeue()->throw($exception);
}
}

Expand Down

0 comments on commit 8fa7842

Please sign in to comment.