Skip to content

Commit

Permalink
Improve CAR (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
puklipo authored Dec 14, 2024
1 parent db3ed4a commit ab56cab
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 33 deletions.
116 changes: 94 additions & 22 deletions src/Support/CAR.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

namespace Revolution\Bluesky\Support;

use GuzzleHttp\Psr7\Utils;
use InvalidArgumentException;
use Psr\Http\Message\StreamInterface;
use YOCLIB\Multiformats\Multibase\Multibase;

/**
Expand All @@ -16,7 +19,7 @@ final class CAR
*
* Limited implementation, can be used to decode downloaded CAR files and Firehose.
*
* Only CIDv1, DAG-CBOR, and SHA2-256 format are supported.
* Only CIDv1, DAG-CBOR/RAW, and SHA2-256 format are supported.
*
* ```
* [$roots, $blocks] = CAR::decode('data');
Expand All @@ -33,24 +36,43 @@ final class CAR
* ]
* ```
*
* @return array<list<string>, array<string, array>>
* @return array{0: list<string>, 1: array<string, array>}
*/
public static function decode(string $data): array
public static function decode(StreamInterface|string $data): array
{
if (! $data instanceof StreamInterface) {
$data = Utils::streamFor($data);
}

$data->rewind();

$roots = self::decodeRoots($data);

$blocks = iterator_to_array(self::blockIterator($data));

rescue(fn () => $data->close());

return [$roots, $blocks];
}

/**
* Decode roots.
*
* @return list<string>
*/
public static function decodeRoots(string $data): array
public static function decodeRoots(StreamInterface|string $data): array
{
$header_length = Varint::decode(substr($data, 0, 1));
$header_bytes = substr($data, 1, $header_length);
if (! $data instanceof StreamInterface) {
$data = Utils::streamFor($data);
}

$data->rewind();

$header_length = Varint::decode($data->read(8));
$varint_len = strlen(Varint::encode($header_length));
$data->seek($varint_len);

$header_bytes = $data->read($header_length);
$header = CBOR::decode($header_bytes)->normalize();

$roots = data_get($header, 'roots');
Expand All @@ -65,36 +87,86 @@ public static function decodeRoots(string $data): array
}

/**
* @return iterable<string, array>
* Decode data block. Returns iterable.
*
* ```
* use Illuminate\Support\Arr;
*
* foreach (CAR::blockIterator($data) as $cid => $block) {
* if (Arr::exists($block, '$type')) {
* }
* }
* ```
* ```
* $blocks = iterator_to_array(CAR::blockIterator($data));
* ```
*
* @return iterable<string, mixed>
*/
public static function blockIterator(string $data): iterable
public static function blockIterator(StreamInterface|string $data): iterable
{
$header_length = Varint::decode(substr($data, 0, 1));
if (! $data instanceof StreamInterface) {
$data = Utils::streamFor($data);
}

$data->rewind();

$header_length = Varint::decode($data->read(1));

$offset = 1 + $header_length;

while ($offset < strlen($data)) {
$block_varint = rescue(fn () => Varint::decode(substr($data, $offset, 1)));
$cid_version = rescue(fn () => Varint::decode(substr($data, $offset + 1, 1)));
$cid_codec_varint = rescue(fn () => Varint::decode(substr($data, $offset + 2, 1)));
$cid_hash_varint = rescue(fn () => Varint::decode(substr($data, $offset + 3, 1)));
while ($offset < $data->getSize()) {
$data->seek($offset);

$block_varint = Varint::decode($data->read(8));
$varint_len = strlen(Varint::encode($block_varint));

$data->seek($offset + $varint_len);

// CIDv0 is not supported
$cid_0 = $data->read(2) === "\x12\x20";
if ($cid_0) {
$offset += $varint_len + $block_varint;

if (empty($block_varint) || $cid_version !== CID::CID_V1 || $cid_hash_varint !== CID::SHA2_256 || $cid_codec_varint !== CID::DAG_CBOR) {
$offset += 1;
continue;
}

$cid_hash_length = rescue(fn () => Varint::decode(substr($data, $offset + 4, 1)));
$cid_bytes = substr($data, $offset + 1, 4 + $cid_hash_length);
$data->seek(-2, SEEK_CUR);

$cid_version = rescue(fn () => Varint::decode($data->read(1)));

if ($cid_version !== CID::CID_V1) {
throw new InvalidArgumentException('Invalid CAR.');
}

$cid_codec = rescue(fn () => Varint::decode($data->read(1)));

// DAG-PB is not supported
if ($cid_codec === CID::DAG_PB) {
$offset += $varint_len + $block_varint;

continue;
}

$cid_hash_type = rescue(fn () => Varint::decode($data->read(1)));
$cid_hash_length = rescue(fn () => Varint::decode($data->read(1)));

$data->seek($offset + $varint_len);
$cid_bytes = $data->read(4 + $cid_hash_length);
$cid = Multibase::encode(Multibase::BASE32, $cid_bytes);

$block_length = $block_varint - 4 - $cid_hash_length;
$block_bytes = substr($data, $offset + 1 + 4 + $cid_hash_length, $block_length);
$block = rescue(fn () => CBOR::decode($block_bytes)->normalize());
$block_bytes = $data->read($block_length);

if ($cid_codec === CID::RAW) {
$block = $block_bytes;
} else {
$block = rescue(fn () => CBOR::decode($block_bytes)->normalize());
}

$offset = $offset + 1 + $block_varint;
$offset += $varint_len + $block_varint;

if (! empty($block) && is_array($block)) {
if (! empty($block)) {
yield $cid => $block;
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/Support/CID.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ final class CID

public const DAG_CBOR = 0x71;

public const DAG_PB = 0x20;

/**
* Encode to a specific format.
*
Expand Down
6 changes: 2 additions & 4 deletions src/Support/Varint.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,12 @@ public static function encode(int $x): string
*/
public static function decode(string $str): int
{
$buf = str_split($str);
$buf = unpack('C*', $str);
$x = new BigInteger(0);
$s = 0;

foreach ($buf as $i => $b) {
$b = intval(bin2hex($b), 16);

throw_if($i >= 9);
throw_if($i >= 10);

if ($b < 0x80) {
throw_if($b === 0 && $s > 0);
Expand Down
148 changes: 141 additions & 7 deletions tests/Feature/Support/SupportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

use CBOR\MapObject;
use CBOR\TextStringObject;
use Firebase\JWT\JWT;
use GuzzleHttp\Psr7\Utils;
use Illuminate\Support\Facades\Http;
use InvalidArgumentException;
use Revolution\Bluesky\Facades\Bluesky;
Expand All @@ -19,6 +21,7 @@
use Revolution\Bluesky\Support\Varint;
use RuntimeException;
use Tests\TestCase;
use YOCLIB\Multiformats\Multibase\Multibase;

class SupportTest extends TestCase
{
Expand Down Expand Up @@ -368,38 +371,169 @@ public function test_varint()
$this->assertSame("\xFF\x01", Varint::encode(0xFF));
$this->assertSame("\xAC\x02", Varint::encode(0x012C));
$this->assertSame("\x80\x80\x01", Varint::encode(0x4000));
$this->assertSame("\x83\x01", Varint::encode(131));

$this->assertSame(0x80, Varint::decode("\x80\x01"));
$this->assertSame(0xFF, Varint::decode("\xFF\x01"));
$this->assertSame(0x012C, Varint::decode("\xAC\x02"));
$this->assertSame(0x4000, Varint::decode("\x80\x80\x01"));
$this->assertSame(131, Varint::decode("\x83\x01"));
}

public function test_car_basic()
{
[$roots, $blocks] = CAR::decode(file_get_contents(__DIR__.'/fixture/carv1-basic.car'));

//dump($blocks);
$this->assertCount(2, $roots);
$this->assertArrayHasKey('bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm', $blocks);
}

public function test_car_basic_stream()
{
$data = Utils::streamFor(Utils::tryFopen(__DIR__.'/fixture/carv1-basic.car', 'rb'));

$data->seek(100);
$block_len = Varint::decode($data->read(8));
$pos = strlen(Varint::encode($block_len));
$data->seek(100 + $pos);
$this->assertSame(91, $block_len);
$ver1 = Varint::decode($data->read(1));
$ver0 = $data->read(2);
$this->assertSame(CID::CID_V1, $ver1);
$this->assertNotSame("\x12\x20", $ver0);

$data->seek(192);
$block_len = Varint::decode($data->read(8));
$pos = strlen(Varint::encode($block_len));
$data->seek(192 + $pos);
$this->assertSame(131, $block_len);
$ver0 = $data->read(2);
$this->assertSame("\x12\x20", $ver0);

$data->seek(100);
$block = $data->read(92);
$this->assertSame(91, Varint::decode(substr($block, 0, 1)));

$data->seek(325);
$this->assertSame(40, Varint::decode($data->read(1)));

$data->seek(362);
$this->assertSame('Y2NjYw', JWT::urlsafeB64Encode($data->read(4)));
}

public function test_car_download_repo()
{
$data = file_get_contents(__DIR__.'/fixture/bsky-app.car');

$roots = CAR::decodeRoots($data);
$this->assertCount(1, $roots);

$i = 0;
$blocks = iterator_to_array(CAR::blockIterator($data));
$this->assertArrayHasKey($roots[0], $blocks);
}

public function test_car_download_repo_stream()
{
$data = Utils::streamFor(utils::tryFopen(__DIR__.'/fixture/bsky-app.car', 'rb'));

$roots = CAR::decodeRoots($data);
$this->assertCount(1, $roots);

foreach (CAR::blockIterator($data) as $cid => $block) {
//dump($cid, $block);
$this->assertIsString($cid);
$this->assertIsArray($block);
$i++;
if ($i > 5) {
break;
}
$this->assertNotEmpty($cid);
$this->assertNotEmpty($block);
}
}

public function test_car_download_file_stream()
{
$data = Utils::streamFor(Utils::tryFopen(__DIR__.'/fixture/bsky-app.car', 'rb'));

$header_len = Varint::decode($data->read(1));
//dump($header_len);
$header = CBOR::decode($data->read($header_len))->normalize();
//dump($header);

$data->seek(1 + $header_len);

$data_len = Varint::decode($data->read(8));
$pos = strlen(Varint::encode($data_len));
$data->seek(1 + $header_len + $pos);
$this->assertSame(209, $data_len);
$this->assertSame(2, $pos);

$ver1 = Varint::decode($data->read(1));
$this->assertSame(CID::CID_V1, $ver1);

$codec = Varint::decode($data->read(1));
$this->assertSame(CID::DAG_CBOR, $codec);

$hash_algo = Varint::decode($data->read(1));
$this->assertSame(CID::SHA2_256, $hash_algo);

$hash_len = Varint::decode($data->read(1));
$this->assertSame(32, $hash_len);

$data->seek(-4, SEEK_CUR);
$hash = $data->read(4 + $hash_len);
$this->assertSame('bafyreig7gxhfmvwk4qzo2aliqytgdiflw6xwi4dkkqwjlgt7zjlvfbptcy', Multibase::encode(Multibase::BASE32, $hash));

$block_len = $data_len - 4 - $hash_len;

$block = $data->read($block_len);
$this->assertIsArray(CBOR::decode($block)->normalize());

$current = $data->tell();

//2
$data_len = Varint::decode($data->read(8));
$pos = strlen(Varint::encode($data_len));
$data->seek($current + $pos);
$this->assertSame(1733, $data_len);
$this->assertSame(2, $pos);

$ver1 = Varint::decode($data->read(1));
$this->assertSame(CID::CID_V1, $ver1);

$codec = Varint::decode($data->read(1));
$this->assertSame(CID::DAG_CBOR, $codec);

$hash_algo = Varint::decode($data->read(1));
$this->assertSame(CID::SHA2_256, $hash_algo);

$hash_len = Varint::decode($data->read(1));
$this->assertSame(32, $hash_len);

$data->seek(-4, SEEK_CUR);
$hash = $data->read(4 + $hash_len);
$this->assertSame('bafyreiejo5hn2rjcihcfz3txibzg3f7z5cmoxpdmloxamwlrrkgllafqy4', Multibase::encode(Multibase::BASE32, $hash));

$block_len = $data_len - 4 - $hash_len;

$block = $data->read($block_len);
$this->assertIsArray(CBOR::decode($block)->normalize());

$current = $data->tell();

//3
$data_len = Varint::decode($data->read(8));
$pos = strlen(Varint::encode($data_len));
$data->seek($current + $pos);
$this->assertSame(224, $data_len);
$this->assertSame(2, $pos);

$ver1 = Varint::decode($data->read(1));
$this->assertSame(CID::CID_V1, $ver1);

$codec = Varint::decode($data->read(1));
$this->assertSame(CID::DAG_CBOR, $codec);

$hash_algo = Varint::decode($data->read(1));
$this->assertSame(CID::SHA2_256, $hash_algo);

$hash_len = Varint::decode($data->read(1));
$this->assertSame(32, $hash_len);
}
}

0 comments on commit ab56cab

Please sign in to comment.