@@ -4,8 +4,14 @@ import { ActivityState } from './activity-state.js'
4
4
import { SPARK_VERSION , MAX_CAR_SIZE , APPROX_ROUND_LENGTH_IN_MS } from './constants.js'
5
5
import { queryTheIndex } from './ipni-client.js'
6
6
import { getMinerPeerId as defaultGetMinerPeerId } from './miner-info.js'
7
+ import { multiaddrToHttpUrl } from './multiaddr.js'
8
+
7
9
import {
8
- encodeHex
10
+ CarBlockIterator ,
11
+ encodeHex ,
12
+ HashMismatchError ,
13
+ UnsupportedHashError ,
14
+ validateBlock
9
15
} from '../vendor/deno-deps.js'
10
16
11
17
const sleep = dt => new Promise ( resolve => setTimeout ( resolve , dt ) )
@@ -77,25 +83,10 @@ export default class Spark {
77
83
stats . protocol = provider . protocol
78
84
stats . providerAddress = provider . address
79
85
80
- const searchParams = new URLSearchParams ( {
81
- // See https://github.com/filecoin-project/lassie/blob/main/docs/HTTP_SPEC.md#dag-scope-request-query-parameter
82
- // Only the root block at the end of the path is returned after blocks required to verify the specified path segments.
83
- 'dag-scope' : 'block' ,
84
- protocols : provider . protocol ,
85
- providers : provider . address
86
- } )
87
- const url = `ipfs://${ retrieval . cid } ?${ searchParams . toString ( ) } `
88
- try {
89
- await this . fetchCAR ( url , stats )
90
- } catch ( err ) {
91
- console . error ( `Failed to fetch ${ url } ` )
92
- console . error ( err )
93
- }
86
+ await this . fetchCAR ( provider . protocol , provider . address , retrieval . cid , stats )
94
87
}
95
88
96
- async fetchCAR ( url , stats ) {
97
- console . log ( `Fetching: ${ url } ` )
98
-
89
+ async fetchCAR ( protocol , address , cid , stats ) {
99
90
// Abort if no progress was made for 60 seconds
100
91
const controller = new AbortController ( )
101
92
const { signal } = controller
@@ -116,6 +107,9 @@ export default class Spark {
116
107
const carBytes = new Uint8Array ( carBuffer )
117
108
118
109
try {
110
+ const url = getRetrievalUrl ( protocol , address , cid )
111
+ console . log ( `Fetching: ${ url } ` )
112
+
119
113
resetTimeout ( )
120
114
const res = await this . #fetch( url , { signal } )
121
115
stats . statusCode = res . status
@@ -146,6 +140,8 @@ export default class Spark {
146
140
}
147
141
148
142
if ( ! stats . carTooLarge ) {
143
+ await verifyContent ( cid , carBytes )
144
+
149
145
const digest = await crypto . subtle . digest ( 'sha-256' , carBytes )
150
146
// 12 is the code for sha2-256
151
147
// 20 is the digest length (32 bytes = 256 bits)
@@ -155,6 +151,12 @@ export default class Spark {
155
151
console . error ( 'Retrieval failed with status code %s: %s' ,
156
152
res . status , ( await res . text ( ) ) . trimEnd ( ) )
157
153
}
154
+ } catch ( err ) {
155
+ console . error ( `Failed to fetch ${ cid } from ${ address } using ${ protocol } ` )
156
+ console . error ( err )
157
+ if ( ! stats . statusCode || stats . statusCode === 200 ) {
158
+ stats . statusCode = mapErrorToStatusCode ( err )
159
+ }
158
160
} finally {
159
161
clearTimeout ( timeout )
160
162
}
@@ -190,18 +192,7 @@ export default class Spark {
190
192
async nextRetrieval ( ) {
191
193
const { id : retrievalId , ...retrieval } = await this . getRetrieval ( )
192
194
193
- const stats = {
194
- timeout : false ,
195
- startAt : new Date ( ) ,
196
- firstByteAt : null ,
197
- endAt : null ,
198
- carTooLarge : false ,
199
- byteLength : 0 ,
200
- carChecksum : null ,
201
- statusCode : null ,
202
- providerId : null ,
203
- indexerResult : null
204
- }
195
+ const stats = newStats ( )
205
196
206
197
await this . executeRetrievalCheck ( retrieval , stats )
207
198
@@ -240,6 +231,96 @@ export default class Spark {
240
231
}
241
232
}
242
233
234
+ export function newStats ( ) {
235
+ return {
236
+ timeout : false ,
237
+ startAt : new Date ( ) ,
238
+ firstByteAt : null ,
239
+ endAt : null ,
240
+ carTooLarge : false ,
241
+ byteLength : 0 ,
242
+ carChecksum : null ,
243
+ statusCode : null
244
+ }
245
+ }
246
+
247
+ function getRetrievalUrl ( protocol , address , cid ) {
248
+ if ( protocol === 'http' ) {
249
+ const baseUrl = multiaddrToHttpUrl ( address )
250
+ return `${ baseUrl } /ipfs/${ cid } ?dag-scope=block`
251
+ }
252
+
253
+ const searchParams = new URLSearchParams ( {
254
+ // See https://github.com/filecoin-project/lassie/blob/main/docs/HTTP_SPEC.md#dag-scope-request-query-parameter
255
+ // Only the root block at the end of the path is returned after blocks required to verify the specified path segments.
256
+ 'dag-scope' : 'block' ,
257
+ protocols : protocol ,
258
+ providers : address
259
+ } )
260
+ return `ipfs://${ cid } ?${ searchParams . toString ( ) } `
261
+ }
262
+
263
+ /**
264
+ * @param {string } cid
265
+ * @param {Uint8Array } carBytes
266
+ */
267
+ async function verifyContent ( cid , carBytes ) {
268
+ let reader
269
+ try {
270
+ reader = await CarBlockIterator . fromBytes ( carBytes )
271
+ } catch ( err ) {
272
+ throw Object . assign ( err , { code : 'CANNOT_PARSE_CAR_BYTES' } )
273
+ }
274
+
275
+ for await ( const block of reader ) {
276
+ if ( block . cid . toString ( ) !== cid . toString ( ) ) {
277
+ throw Object . assign (
278
+ new Error ( `Unexpected block CID ${ block . cid } . Expected: ${ cid } ` ) ,
279
+ { code : 'UNEXPECTED_CAR_BLOCK' }
280
+ )
281
+ }
282
+
283
+ await validateBlock ( block )
284
+ }
285
+ }
286
+
287
+ function mapErrorToStatusCode ( err ) {
288
+ // 7xx codes for multiaddr parsing errors
289
+ switch ( err . code ) {
290
+ case 'UNSUPPORTED_MULTIADDR_HOST_TYPE' :
291
+ return 701
292
+ case 'UNSUPPORTED_MULTIADDR_PROTO' :
293
+ return 702
294
+ case 'UNSUPPORTED_MULTIADDR_SCHEME' :
295
+ return 703
296
+ case 'MULTIADDR_HAS_TOO_MANY_PARTS' :
297
+ return 704
298
+ }
299
+
300
+ // 9xx for content verification errors
301
+ if ( err instanceof UnsupportedHashError ) {
302
+ return 901
303
+ } else if ( err instanceof HashMismatchError ) {
304
+ return 902
305
+ } else if ( err . code === 'UNEXPECTED_CAR_BLOCK' ) {
306
+ return 903
307
+ } else if ( err . code === 'CANNOT_PARSE_CAR_BYTES' ) {
308
+ return 904
309
+ }
310
+
311
+ // 8xx errors for network connection errors
312
+ // Unfortunately, the Fetch API does not support programmatic detection of various error
313
+ // conditions. We have to check the error message text.
314
+ if ( err . message . includes ( 'dns error' ) ) {
315
+ return 801
316
+ } else if ( err . message . includes ( 'tcp connect error' ) ) {
317
+ return 802
318
+ }
319
+
320
+ // Fallback code for unknown errors
321
+ return 600
322
+ }
323
+
243
324
async function assertOkResponse ( res , errorMsg ) {
244
325
if ( res . ok ) return
245
326
0 commit comments