From ab56caba774a54938b761fea32a768a64238dce8 Mon Sep 17 00:00:00 2001 From: puklipo Date: Sat, 14 Dec 2024 20:56:50 +0900 Subject: [PATCH] Improve CAR (#16) --- src/Support/CAR.php | 116 ++++++++++++++++---- src/Support/CID.php | 2 + src/Support/Varint.php | 6 +- tests/Feature/Support/SupportTest.php | 148 ++++++++++++++++++++++++-- 4 files changed, 239 insertions(+), 33 deletions(-) diff --git a/src/Support/CAR.php b/src/Support/CAR.php index 831951e..3923673 100644 --- a/src/Support/CAR.php +++ b/src/Support/CAR.php @@ -4,6 +4,9 @@ namespace Revolution\Bluesky\Support; +use GuzzleHttp\Psr7\Utils; +use InvalidArgumentException; +use Psr\Http\Message\StreamInterface; use YOCLIB\Multiformats\Multibase\Multibase; /** @@ -16,7 +19,7 @@ final class CAR * * Limited implementation, can be used to decode downloaded CAR files and Firehose. * - * Only CIDv1, DAG-CBOR, and SHA2-256 format are supported. + * Only CIDv1, DAG-CBOR/RAW, and SHA2-256 format are supported. * * ``` * [$roots, $blocks] = CAR::decode('data'); @@ -33,24 +36,43 @@ final class CAR * ] * ``` * - * @return array, array> + * @return array{0: list, 1: array} */ - public static function decode(string $data): array + public static function decode(StreamInterface|string $data): array { + if (! $data instanceof StreamInterface) { + $data = Utils::streamFor($data); + } + + $data->rewind(); + $roots = self::decodeRoots($data); $blocks = iterator_to_array(self::blockIterator($data)); + rescue(fn () => $data->close()); + return [$roots, $blocks]; } /** + * Decode roots. + * * @return list */ - public static function decodeRoots(string $data): array + public static function decodeRoots(StreamInterface|string $data): array { - $header_length = Varint::decode(substr($data, 0, 1)); - $header_bytes = substr($data, 1, $header_length); + if (! $data instanceof StreamInterface) { + $data = Utils::streamFor($data); + } + + $data->rewind(); + + $header_length = Varint::decode($data->read(8)); + $varint_len = strlen(Varint::encode($header_length)); + $data->seek($varint_len); + + $header_bytes = $data->read($header_length); $header = CBOR::decode($header_bytes)->normalize(); $roots = data_get($header, 'roots'); @@ -65,36 +87,86 @@ public static function decodeRoots(string $data): array } /** - * @return iterable + * Decode data block. Returns iterable. + * + * ``` + * use Illuminate\Support\Arr; + * + * foreach (CAR::blockIterator($data) as $cid => $block) { + * if (Arr::exists($block, '$type')) { + * } + * } + * ``` + * ``` + * $blocks = iterator_to_array(CAR::blockIterator($data)); + * ``` + * + * @return iterable */ - public static function blockIterator(string $data): iterable + public static function blockIterator(StreamInterface|string $data): iterable { - $header_length = Varint::decode(substr($data, 0, 1)); + if (! $data instanceof StreamInterface) { + $data = Utils::streamFor($data); + } + + $data->rewind(); + + $header_length = Varint::decode($data->read(1)); $offset = 1 + $header_length; - while ($offset < strlen($data)) { - $block_varint = rescue(fn () => Varint::decode(substr($data, $offset, 1))); - $cid_version = rescue(fn () => Varint::decode(substr($data, $offset + 1, 1))); - $cid_codec_varint = rescue(fn () => Varint::decode(substr($data, $offset + 2, 1))); - $cid_hash_varint = rescue(fn () => Varint::decode(substr($data, $offset + 3, 1))); + while ($offset < $data->getSize()) { + $data->seek($offset); + + $block_varint = Varint::decode($data->read(8)); + $varint_len = strlen(Varint::encode($block_varint)); + + $data->seek($offset + $varint_len); + + // CIDv0 is not supported + $cid_0 = $data->read(2) === "\x12\x20"; + if ($cid_0) { + $offset += $varint_len + $block_varint; - if (empty($block_varint) || $cid_version !== CID::CID_V1 || $cid_hash_varint !== CID::SHA2_256 || $cid_codec_varint !== CID::DAG_CBOR) { - $offset += 1; continue; } - $cid_hash_length = rescue(fn () => Varint::decode(substr($data, $offset + 4, 1))); - $cid_bytes = substr($data, $offset + 1, 4 + $cid_hash_length); + $data->seek(-2, SEEK_CUR); + + $cid_version = rescue(fn () => Varint::decode($data->read(1))); + + if ($cid_version !== CID::CID_V1) { + throw new InvalidArgumentException('Invalid CAR.'); + } + + $cid_codec = rescue(fn () => Varint::decode($data->read(1))); + + // DAG-PB is not supported + if ($cid_codec === CID::DAG_PB) { + $offset += $varint_len + $block_varint; + + continue; + } + + $cid_hash_type = rescue(fn () => Varint::decode($data->read(1))); + $cid_hash_length = rescue(fn () => Varint::decode($data->read(1))); + + $data->seek($offset + $varint_len); + $cid_bytes = $data->read(4 + $cid_hash_length); $cid = Multibase::encode(Multibase::BASE32, $cid_bytes); $block_length = $block_varint - 4 - $cid_hash_length; - $block_bytes = substr($data, $offset + 1 + 4 + $cid_hash_length, $block_length); - $block = rescue(fn () => CBOR::decode($block_bytes)->normalize()); + $block_bytes = $data->read($block_length); + + if ($cid_codec === CID::RAW) { + $block = $block_bytes; + } else { + $block = rescue(fn () => CBOR::decode($block_bytes)->normalize()); + } - $offset = $offset + 1 + $block_varint; + $offset += $varint_len + $block_varint; - if (! empty($block) && is_array($block)) { + if (! empty($block)) { yield $cid => $block; } } diff --git a/src/Support/CID.php b/src/Support/CID.php index 47d0234..9068fac 100644 --- a/src/Support/CID.php +++ b/src/Support/CID.php @@ -22,6 +22,8 @@ final class CID public const DAG_CBOR = 0x71; + public const DAG_PB = 0x20; + /** * Encode to a specific format. * diff --git a/src/Support/Varint.php b/src/Support/Varint.php index f97a2cf..34b489b 100644 --- a/src/Support/Varint.php +++ b/src/Support/Varint.php @@ -41,14 +41,12 @@ public static function encode(int $x): string */ public static function decode(string $str): int { - $buf = str_split($str); + $buf = unpack('C*', $str); $x = new BigInteger(0); $s = 0; foreach ($buf as $i => $b) { - $b = intval(bin2hex($b), 16); - - throw_if($i >= 9); + throw_if($i >= 10); if ($b < 0x80) { throw_if($b === 0 && $s > 0); diff --git a/tests/Feature/Support/SupportTest.php b/tests/Feature/Support/SupportTest.php index e3c2d4b..2ff4f98 100644 --- a/tests/Feature/Support/SupportTest.php +++ b/tests/Feature/Support/SupportTest.php @@ -6,6 +6,8 @@ use CBOR\MapObject; use CBOR\TextStringObject; +use Firebase\JWT\JWT; +use GuzzleHttp\Psr7\Utils; use Illuminate\Support\Facades\Http; use InvalidArgumentException; use Revolution\Bluesky\Facades\Bluesky; @@ -19,6 +21,7 @@ use Revolution\Bluesky\Support\Varint; use RuntimeException; use Tests\TestCase; +use YOCLIB\Multiformats\Multibase\Multibase; class SupportTest extends TestCase { @@ -368,21 +371,57 @@ public function test_varint() $this->assertSame("\xFF\x01", Varint::encode(0xFF)); $this->assertSame("\xAC\x02", Varint::encode(0x012C)); $this->assertSame("\x80\x80\x01", Varint::encode(0x4000)); + $this->assertSame("\x83\x01", Varint::encode(131)); $this->assertSame(0x80, Varint::decode("\x80\x01")); $this->assertSame(0xFF, Varint::decode("\xFF\x01")); $this->assertSame(0x012C, Varint::decode("\xAC\x02")); $this->assertSame(0x4000, Varint::decode("\x80\x80\x01")); + $this->assertSame(131, Varint::decode("\x83\x01")); } public function test_car_basic() { [$roots, $blocks] = CAR::decode(file_get_contents(__DIR__.'/fixture/carv1-basic.car')); + //dump($blocks); $this->assertCount(2, $roots); $this->assertArrayHasKey('bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm', $blocks); } + public function test_car_basic_stream() + { + $data = Utils::streamFor(Utils::tryFopen(__DIR__.'/fixture/carv1-basic.car', 'rb')); + + $data->seek(100); + $block_len = Varint::decode($data->read(8)); + $pos = strlen(Varint::encode($block_len)); + $data->seek(100 + $pos); + $this->assertSame(91, $block_len); + $ver1 = Varint::decode($data->read(1)); + $ver0 = $data->read(2); + $this->assertSame(CID::CID_V1, $ver1); + $this->assertNotSame("\x12\x20", $ver0); + + $data->seek(192); + $block_len = Varint::decode($data->read(8)); + $pos = strlen(Varint::encode($block_len)); + $data->seek(192 + $pos); + $this->assertSame(131, $block_len); + $ver0 = $data->read(2); + $this->assertSame("\x12\x20", $ver0); + + $data->seek(100); + $block = $data->read(92); + $this->assertSame(91, Varint::decode(substr($block, 0, 1))); + + $data->seek(325); + $this->assertSame(40, Varint::decode($data->read(1))); + + $data->seek(362); + $this->assertSame('Y2NjYw', JWT::urlsafeB64Encode($data->read(4))); + } + public function test_car_download_repo() { $data = file_get_contents(__DIR__.'/fixture/bsky-app.car'); @@ -390,16 +429,111 @@ public function test_car_download_repo() $roots = CAR::decodeRoots($data); $this->assertCount(1, $roots); - $i = 0; + $blocks = iterator_to_array(CAR::blockIterator($data)); + $this->assertArrayHasKey($roots[0], $blocks); + } + + public function test_car_download_repo_stream() + { + $data = Utils::streamFor(utils::tryFopen(__DIR__.'/fixture/bsky-app.car', 'rb')); + + $roots = CAR::decodeRoots($data); + $this->assertCount(1, $roots); foreach (CAR::blockIterator($data) as $cid => $block) { //dump($cid, $block); - $this->assertIsString($cid); - $this->assertIsArray($block); - $i++; - if ($i > 5) { - break; - } + $this->assertNotEmpty($cid); + $this->assertNotEmpty($block); } } + + public function test_car_download_file_stream() + { + $data = Utils::streamFor(Utils::tryFopen(__DIR__.'/fixture/bsky-app.car', 'rb')); + + $header_len = Varint::decode($data->read(1)); + //dump($header_len); + $header = CBOR::decode($data->read($header_len))->normalize(); + //dump($header); + + $data->seek(1 + $header_len); + + $data_len = Varint::decode($data->read(8)); + $pos = strlen(Varint::encode($data_len)); + $data->seek(1 + $header_len + $pos); + $this->assertSame(209, $data_len); + $this->assertSame(2, $pos); + + $ver1 = Varint::decode($data->read(1)); + $this->assertSame(CID::CID_V1, $ver1); + + $codec = Varint::decode($data->read(1)); + $this->assertSame(CID::DAG_CBOR, $codec); + + $hash_algo = Varint::decode($data->read(1)); + $this->assertSame(CID::SHA2_256, $hash_algo); + + $hash_len = Varint::decode($data->read(1)); + $this->assertSame(32, $hash_len); + + $data->seek(-4, SEEK_CUR); + $hash = $data->read(4 + $hash_len); + $this->assertSame('bafyreig7gxhfmvwk4qzo2aliqytgdiflw6xwi4dkkqwjlgt7zjlvfbptcy', Multibase::encode(Multibase::BASE32, $hash)); + + $block_len = $data_len - 4 - $hash_len; + + $block = $data->read($block_len); + $this->assertIsArray(CBOR::decode($block)->normalize()); + + $current = $data->tell(); + + //2 + $data_len = Varint::decode($data->read(8)); + $pos = strlen(Varint::encode($data_len)); + $data->seek($current + $pos); + $this->assertSame(1733, $data_len); + $this->assertSame(2, $pos); + + $ver1 = Varint::decode($data->read(1)); + $this->assertSame(CID::CID_V1, $ver1); + + $codec = Varint::decode($data->read(1)); + $this->assertSame(CID::DAG_CBOR, $codec); + + $hash_algo = Varint::decode($data->read(1)); + $this->assertSame(CID::SHA2_256, $hash_algo); + + $hash_len = Varint::decode($data->read(1)); + $this->assertSame(32, $hash_len); + + $data->seek(-4, SEEK_CUR); + $hash = $data->read(4 + $hash_len); + $this->assertSame('bafyreiejo5hn2rjcihcfz3txibzg3f7z5cmoxpdmloxamwlrrkgllafqy4', Multibase::encode(Multibase::BASE32, $hash)); + + $block_len = $data_len - 4 - $hash_len; + + $block = $data->read($block_len); + $this->assertIsArray(CBOR::decode($block)->normalize()); + + $current = $data->tell(); + + //3 + $data_len = Varint::decode($data->read(8)); + $pos = strlen(Varint::encode($data_len)); + $data->seek($current + $pos); + $this->assertSame(224, $data_len); + $this->assertSame(2, $pos); + + $ver1 = Varint::decode($data->read(1)); + $this->assertSame(CID::CID_V1, $ver1); + + $codec = Varint::decode($data->read(1)); + $this->assertSame(CID::DAG_CBOR, $codec); + + $hash_algo = Varint::decode($data->read(1)); + $this->assertSame(CID::SHA2_256, $hash_algo); + + $hash_len = Varint::decode($data->read(1)); + $this->assertSame(32, $hash_len); + } }