diff --git a/composer.json b/composer.json index 2e621d5..ee25076 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,8 @@ "require": { "php": "^8.1", "farzai/support": "^1.2", - "farzai/transport": "^1.2" + "farzai/transport": "^1.2", + "phrity/websocket": "^2.0" }, "require-dev": { "pestphp/pest": "^2.15", diff --git a/example/stream-trade.php b/example/stream-trade.php new file mode 100644 index 0000000..e59f83f --- /dev/null +++ b/example/stream-trade.php @@ -0,0 +1,33 @@ +setCredentials('YOUR_API_KEY', 'YOUR_SECRET') + ->build(); + +$websocket = new WebSocketClient($client); + +$websocket->listen([ + 'market.trade.thb_btc' => [ + function (Message $message) { + echo $message->json('sym').PHP_EOL; + }, + ], +]); + +$websocket->listen([ + 'market.trade.thb_eth' => function (Message $message) { + echo $message->json('sym').PHP_EOL; + }, +]); + +$websocket->listen('market.trade.thb_ada', function (Message $message) { + echo $message->json('sym').PHP_EOL; +}); + +$websocket->run(); diff --git a/src/Contracts/WebSocketEngineInterface.php b/src/Contracts/WebSocketEngineInterface.php new file mode 100644 index 0000000..014dcb2 --- /dev/null +++ b/src/Contracts/WebSocketEngineInterface.php @@ -0,0 +1,10 @@ +client = $client; } diff --git a/src/UriFactory.php b/src/UriFactory.php new file mode 100644 index 0000000..cb38704 --- /dev/null +++ b/src/UriFactory.php @@ -0,0 +1,14 @@ + + */ + private array $listeners = []; + + public function __construct( + private LoggerInterface $logger, + ) { + } + + public function addListener(string $event, callable $listener) + { + $this->listeners[$event][] = $listener; + + return $this; + } + + public function run(): void + { + $events = array_unique(array_keys($this->listeners)); + + $client = new WebSocketClient('wss://api.bitkub.com/websocket-api/'.implode(',', $events)); + + $client + ->addMiddleware(new WebSocketMiddleware\CloseHandler()) + ->addMiddleware(new WebSocketMiddleware\PingResponder()); + + $client->onText(function (WebSocketClient $client, WebSocketConnection $connection, WebSocketMessage $message) { + $receivedAt = Carbon::now(); + + $data = @json_decode($message->getContent(), true) ?? []; + if (! isset($data['stream'])) { + $this->logger->warning('[WebSocket] - '.Carbon::now()->format('Y-m-d H:i:s').' - Unknown data: '.$message->getContent()); + + return; + } + + $event = $data['stream']; + if (! isset($this->listeners[$event])) { + $this->logger->warning('[WebSocket] - '.Carbon::now()->format('Y-m-d H:i:s').' - Unknown event: '.$event); + + return; + } + + $message = new Message( + $message->getContent(), + $receivedAt->toDateTimeImmutable(), + ); + + foreach ($this->listeners[$event] as $listener) { + $this->logger->info('[WebSocket] - '.Carbon::now()->format('Y-m-d H:i:s').' - Event: '.$event); + + $listener($message); + } + }); + + $client->start(); + } +} diff --git a/src/WebSocket/Message.php b/src/WebSocket/Message.php new file mode 100644 index 0000000..fe1924c --- /dev/null +++ b/src/WebSocket/Message.php @@ -0,0 +1,97 @@ +body = $body; + $this->jsonDecoded = @json_decode($body, true) ?? false; + $this->receivedAt = $receivedAt; + } + + public function getBody(): string + { + return $this->body; + } + + public function getReceivedAt(): DateTimeImmutable + { + return $this->receivedAt; + } + + public function json($key = null) + { + if ($key === null) { + return $this->jsonDecoded ?: null; + } + + return Arr::get($this->jsonDecoded, $key); + } + + public function __toString(): string + { + return $this->getBody(); + } + + public function jsonSerialize(): array + { + return $this->toArray(); + } + + public function toArray(): array + { + return $this->jsonDecoded; + } + + public function offsetExists($offset): bool + { + return isset($this->jsonDecoded[$offset]); + } + + public function offsetGet($offset): mixed + { + return $this->jsonDecoded[$offset]; + } + + public function offsetSet($offset, $value): void + { + $this->jsonDecoded[$offset] = $value; + } + + public function offsetUnset($offset): void + { + unset($this->jsonDecoded[$offset]); + } + + public function __get($name) + { + return $this->jsonDecoded[$name] ?? null; + } + + public function __isset($name): bool + { + return isset($this->jsonDecoded[$name]); + } + + public function __set($name, $value): void + { + $this->jsonDecoded[$name] = $value; + } + + public function __unset($name): void + { + unset($this->jsonDecoded[$name]); + } +} diff --git a/src/WebSocketClient.php b/src/WebSocketClient.php new file mode 100644 index 0000000..a446ee5 --- /dev/null +++ b/src/WebSocketClient.php @@ -0,0 +1,93 @@ +> + */ + private array $listeners = []; + + public function __construct(ClientInterface $client) + { + $this->client = $client; + $this->websocket = new WebSocket\Engine($this->getLogger()); + } + + public function getConfig(): array + { + return $this->client->getConfig(); + } + + public function getTransport(): Transport + { + return $this->client->getTransport(); + } + + public function getLogger(): LoggerInterface + { + return $this->client->getLogger(); + } + + public function sendRequest(PsrRequestInterface $request) + { + return $this->client->sendRequest($request); + } + + /** + * Add event listener. + * + * @example $websocket->listen('market.trade.thb_btc', function (Message $message) { + * echo $message->json('sym').PHP_EOL; + * }); + * + * @param array>|string $listeners + */ + public function listen($listeners) + { + if (func_num_args() === 2) { + $eventName = func_get_arg(0); + $listener = is_array(func_get_arg(1)) ? func_get_arg(1) : [func_get_arg(1)]; + + $listeners = [$eventName => $listener]; + } + + foreach ($listeners as $event => $listener) { + if (! isset($this->listeners[$event])) { + $this->listeners[$event] = []; + } + + if (is_callable($listener)) { + $this->listeners[$event][] = $listener; + } elseif (is_array($listener)) { + foreach ($listener as $callback) { + $this->listeners[$event][] = $callback; + } + } + } + + return $this; + } + + public function run() + { + foreach ($this->listeners as $event => $listeners) { + foreach ($listeners as $listener) { + $this->websocket->addListener($event, $listener); + } + } + + $this->websocket->run(); + } +}