Skip to content

Commit

Permalink
Allow reusing of outstanding requests
Browse files Browse the repository at this point in the history
Refactoring to ensure only one concurrent request is sent to the server for a unique name/type tuple
  • Loading branch information
DaveRandom committed Jun 16, 2014
1 parent f7e199f commit 8e82108
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 49 deletions.
162 changes: 120 additions & 42 deletions lib/Addr/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,28 @@ class Client
/**
* @var array
*/
private $outstandingLookups = [];
private $pendingLookups = [];

/**
* @var array
*/
private $pendingRequestsByNameAndType = [];

/**
* @var array
*/
private $pendingRequestsById = [];

/**
* @var int
*/
private $requestIdCounter = 0;

/**
* @var int
*/
private $lookupIdCounter = 0;

/**
* Constructor
*
Expand All @@ -69,9 +84,9 @@ public function __construct(
$this->requestBuilder = $requestBuilder;
$this->responseInterpreter = $responseInterpreter;

$serverAddress = $serverAddress === null ? (string)$serverAddress : '8.8.8.8';
$serverPort = $serverPort === null ? (int)$serverPort : 53;
$requestTimeout = $requestTimeout === null ? (int)$requestTimeout : 2000;
$serverAddress = $serverAddress !== null ? (string)$serverAddress : '8.8.8.8';
$serverPort = $serverPort !== null ? (int)$serverPort : 53;
$requestTimeout = $requestTimeout !== null ? (int)$requestTimeout : 2000;

$address = sprintf('udp://%s:%d', $serverAddress, $serverPort);
$this->socket = stream_socket_client($address, $errNo, $errStr);
Expand All @@ -96,7 +111,25 @@ private function getNextFreeRequestId()
if ($this->requestIdCounter >= 65536) {
$this->requestIdCounter = 0;
}
} while(isset($this->outstandingLookups[$result]));
} while(isset($this->pendingRequestsById[$result]));

return $result;
}

/**
* Get the next available lookup ID
*
* @return int
*/
private function getNextFreeLookupId()
{
do {
$result = $this->lookupIdCounter++;

if ($this->lookupIdCounter >= PHP_INT_MAX) {
$this->lookupIdCounter = 0;
}
} while(isset($this->pendingLookups[$result]));

return $result;
}
Expand Down Expand Up @@ -130,6 +163,34 @@ private function getRequestList($mode)
return $result;
}

/**
* Send a request to the server
*
* @param array $request
*/
private function sendRequest($request)
{
$packet = $this->requestBuilder->buildRequest($request['id'], $request['name'], $request['type']);
fwrite($this->socket, $packet);

$request['timeout_id'] = $this->reactor->once(function() use($request) {
unset($this->pendingRequestsByNameAndType[$request['name']][$request['type']]);

foreach ($request['lookups'] as $id => $lookup) {
$this->completePendingLookup($id, null, ResolutionErrors::ERR_SERVER_TIMEOUT);
}
}, $this->requestTimeout);

if ($this->readWatcherId === null) {
$this->readWatcherId = $this->reactor->onReadable($this->socket, function() {
$this->onSocketReadable();
});
}

$this->pendingRequestsById[$request['id']] = $request;
$this->pendingRequestsByNameAndType[$request['name']][$request['type']] = &$this->pendingRequestsById[$request['id']];
}

/**
* Handle data waiting to be read from the socket
*/
Expand All @@ -143,10 +204,29 @@ private function onSocketReadable()
}

list($id, $addr, $ttl) = $response;
$request = $this->pendingRequestsById[$id];
$type = $request['type'];
$name = $request['name'];

$this->reactor->cancel($request['timeout_id']);
unset($this->pendingRequestsById[$id], $this->pendingRequestsByNameAndType[$name][$type]);
if (!$this->pendingRequestsById) {
$this->reactor->cancel($this->readWatcherId);
$this->readWatcherId = null;
}

if ($addr !== null) {
$this->completeOutstandingRequest($id, $addr, $this->outstandingLookups[$id]['last_type'], $ttl);
foreach ($request['lookups'] as $id => $lookup) {
$this->completePendingLookup($id, $addr, $type);
}

if ($request['cache_store']) {
call_user_func($request['cache_store'], $name, $addr, $type, $ttl);
}
} else {
$this->processOutstandingLookup($id);
foreach ($request['lookups'] as $id => $lookup) {
$this->processPendingLookup($id);
}
}
}

Expand All @@ -156,46 +236,44 @@ private function onSocketReadable()
* @param int $id
* @param string $addr
* @param int $type
* @param int $ttl
*/
private function completeOutstandingRequest($id, $addr, $type, $ttl = null)
private function completePendingLookup($id, $addr, $type)
{
$this->reactor->cancel($this->outstandingLookups[$id]['timeout_id']);
call_user_func($this->outstandingLookups[$id]['callback'], $addr, $type, $ttl);
unset($this->outstandingLookups[$id]);

if (!$this->outstandingLookups) {
$this->reactor->cancel($this->readWatcherId);
$this->readWatcherId = null;
}
call_user_func($this->pendingLookups[$id]['callback'], $addr, $type);
unset($this->pendingLookups[$id]);
}

/**
* Send a request to the server
*
* @param int $id
*/
private function processOutstandingLookup($id)
private function processPendingLookup($id)
{
if (!$this->outstandingLookups[$id]['requests']) {
$this->completeOutstandingRequest($id, null, ResolutionErrors::ERR_NO_RECORD);
$lookup = &$this->pendingLookups[$id];

if (!$lookup['requests']) {
$this->completePendingLookup($id, null, ResolutionErrors::ERR_NO_RECORD);
return;
}

$type = array_shift($this->outstandingLookups[$id]['requests']);
$this->outstandingLookups[$id]['last_type'] = $type;
$name = $lookup['name'];
$type = array_shift($lookup['requests']);
$lookup['last_type'] = $type;

$packet = $this->requestBuilder->buildRequest($id, $this->outstandingLookups[$id]['name'], $type);
fwrite($this->socket, $packet);
$this->pendingRequestsByNameAndType[$name][$type]['lookups'][$id] = $lookup;

$this->outstandingLookups[$id]['timeout_id'] = $this->reactor->once(function() use($id) {
$this->completeOutstandingRequest($id, null, ResolutionErrors::ERR_SERVER_TIMEOUT);
}, $this->requestTimeout);
if (count($this->pendingRequestsByNameAndType[$name][$type]) === 1) {
$request = [
'id' => $this->getNextFreeRequestId(),
'name' => $name,
'type' => $type,
'lookups' => [$id => $lookup],
'timeout_id' => null,
'cache_store' => $lookup['cache_store'],
];

if ($this->readWatcherId === null) {
$this->readWatcherId = $this->reactor->onReadable($this->socket, function() {
$this->onSocketReadable();
});
$this->sendRequest($request);
}
}

Expand All @@ -205,20 +283,20 @@ private function processOutstandingLookup($id)
* @param string $name
* @param int $mode
* @param callable $callback
* @param callable $cacheStore
*/
public function resolve($name, $mode, callable $callback)
public function resolve($name, $mode, callable $callback, callable $cacheStore = null)
{
$requests = $this->getRequestList($mode);
$id = $this->getNextFreeRequestId();

$this->outstandingLookups[$id] = [
'name' => $name,
'requests' => $requests,
'last_type' => null,
'timeout_id' => null,
'callback' => $callback
$id = $this->getNextFreeLookupId();

$this->pendingLookups[$id] = [
'name' => $name,
'requests' => $this->getRequestList($mode),
'last_type' => null,
'callback' => $callback,
'cache_store' => $cacheStore,
];

$this->processOutstandingLookup($id);
$this->processPendingLookup($id);
}
}
17 changes: 10 additions & 7 deletions lib/Addr/Resolver.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ class Resolver
*/
private $hostsFile;

/**
* @var callable
*/
private $cacheStoreCallback;

/**
* Constructor
*
Expand All @@ -52,6 +57,10 @@ public function __construct(
$this->client = $client;
$this->cache = $cache;
$this->hostsFile = $hostsFile;

if ($cache) {
$this->cacheStoreCallback = [$cache, 'store'];
}
}

/**
Expand Down Expand Up @@ -158,13 +167,7 @@ private function resolveFromServer($name, $mode, $callback)
return;
}

$this->client->resolve($name, $mode, function($addr, $type, $ttl) use($name, $callback) {
if ($addr !== null && $this->cache) {
$this->cache->store($name, $addr, $type, $ttl);
}

call_user_func($callback, $addr, $type);
});
$this->client->resolve($name, $mode, $callback, $this->cacheStoreCallback);
}

/**
Expand Down

0 comments on commit 8e82108

Please sign in to comment.