Skip to content

Commit f9aa7ae

Browse files
committed
feat: deterministic tasking
Signed-off-by: Miroslav Bajtoš <oss@bajtos.net>
1 parent e9be757 commit f9aa7ae

6 files changed

+291
-44
lines changed

lib/http-assertions.js

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { AssertionError } from 'zinnia:assert'
2+
3+
/**
4+
* @param {Response} res
5+
* @param {string} [errorMsg]
6+
*/
7+
export async function assertOkResponse (res, errorMsg) {
8+
if (res.ok) return
9+
10+
let body
11+
try {
12+
body = await res.text()
13+
} catch {}
14+
const err = new Error(`${errorMsg ?? 'Fetch failed'} (${res.status}): ${body?.trimEnd()}`)
15+
err.statusCode = res.status
16+
err.serverMessage = body
17+
throw err
18+
}
19+
20+
/**
21+
* @param {Response} res
22+
* @param {string} [errorMsg]
23+
*/
24+
export async function assertRedirectResponse (res, errorMsg) {
25+
if ([301, 302, 303, 304, 307, 308].includes(res.status)) {
26+
const location = res.headers.get('location')
27+
if (!location) {
28+
const msg = 'The server response is missing the Location header. Headers found:\n' +
29+
Array.from(res.headers.keys()).join('\n')
30+
throw new AssertionError(msg)
31+
}
32+
return location
33+
}
34+
35+
let body
36+
try {
37+
body = await res.text()
38+
} catch {}
39+
const err = new Error(`${errorMsg ?? 'Server did not respond with redirection'} (${res.status}): ${body?.trimEnd()}`)
40+
err.statusCode = res.status
41+
err.serverMessage = body
42+
throw err
43+
}

lib/spark.js

+17-32
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import { ActivityState } from './activity-state.js'
44
import { SPARK_VERSION, MAX_CAR_SIZE, APPROX_ROUND_LENGTH_IN_MS } from './constants.js'
55
import { queryTheIndex } from './ipni-client.js'
6+
import { assertOkResponse } from './http-assertions.js'
67
import { getMinerPeerId as defaultGetMinerPeerId } from './miner-info.js'
78
import { multiaddrToHttpUrl } from './multiaddr.js'
9+
import { Tasker } from './tasker.js'
810

911
import {
1012
CarBlockIterator,
@@ -20,33 +22,25 @@ export default class Spark {
2022
#fetch
2123
#getMinerPeerId
2224
#activity = new ActivityState()
23-
#maxTasksPerNode = 360
25+
#tasker
2426

2527
constructor ({
2628
fetch = globalThis.fetch,
2729
getMinerPeerId = defaultGetMinerPeerId
2830
} = {}) {
2931
this.#fetch = fetch
3032
this.#getMinerPeerId = getMinerPeerId
33+
this.#tasker = new Tasker({
34+
fetch: this.#fetch,
35+
activityState: this.#activity
36+
})
3137
}
3238

3339
async getRetrieval () {
34-
console.log('Getting current SPARK round details...')
35-
const res = await this.#fetch('https://api.filspark.com/rounds/current', {
36-
method: 'GET',
37-
headers: { 'Content-Type': 'application/json' },
38-
signal: AbortSignal.timeout(10_000)
39-
})
40-
await assertOkResponse(res, 'Failed to fetch the current SPARK round')
41-
this.#activity.onHealthy()
42-
const { retrievalTasks, maxTasksPerNode, ...round } = await res.json()
43-
console.log('Current SPARK round:', round)
44-
console.log(' %s max tasks per node', maxTasksPerNode ?? '<n/a>')
45-
console.log(' %s retrieval tasks', retrievalTasks.length)
46-
if (maxTasksPerNode) this.#maxTasksPerNode = maxTasksPerNode
47-
48-
const retrieval = retrievalTasks[Math.floor(Math.random() * retrievalTasks.length)]
49-
console.log({ retrieval })
40+
const retrieval = await this.#tasker.next()
41+
if (retrieval) {
42+
console.log({ retrieval })
43+
}
5044
return retrieval
5145
}
5246

@@ -190,7 +184,11 @@ export default class Spark {
190184
}
191185

192186
async nextRetrieval () {
193-
const { id: retrievalId, ...retrieval } = await this.getRetrieval()
187+
const retrieval = await this.getRetrieval()
188+
if (!retrieval) {
189+
console.log('Completed all tasks for the current round. Waiting for the next round to start.')
190+
return
191+
}
194192

195193
const stats = newStats()
196194

@@ -211,7 +209,7 @@ export default class Spark {
211209
this.handleRunError(err)
212210
}
213211
const duration = Date.now() - started
214-
const baseDelay = APPROX_ROUND_LENGTH_IN_MS / this.#maxTasksPerNode
212+
const baseDelay = APPROX_ROUND_LENGTH_IN_MS / this.#tasker.maxTasksPerRound
215213
const delay = baseDelay - duration
216214
if (delay > 0) {
217215
console.log('Sleeping for %s seconds before starting the next task...', Math.round(delay / 1000))
@@ -320,16 +318,3 @@ function mapErrorToStatusCode (err) {
320318
// Fallback code for unknown errors
321319
return 600
322320
}
323-
324-
async function assertOkResponse (res, errorMsg) {
325-
if (res.ok) return
326-
327-
let body
328-
try {
329-
body = await res.text()
330-
} catch {}
331-
const err = new Error(`${errorMsg ?? 'Fetch failed'} (${res.status}): ${body.trimEnd()}`)
332-
err.statusCode = res.status
333-
err.serverMessage = body
334-
throw err
335-
}

lib/tasker.js

+150
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/* global Zinnia */
2+
3+
import { ActivityState } from './activity-state.js'
4+
import { encodeHex } from '../vendor/deno-deps.js'
5+
import { assertOkResponse, assertRedirectResponse } from './http-assertions.js'
6+
import { getRandomnessForSparkRound } from './drand-client.js'
7+
import { assertEquals, assertInstanceOf } from 'zinnia:assert'
8+
9+
/** @typedef {{cid: string; minerId: string;}} RetrievalTask */
10+
/** @typedef {RetrievalTask & { key: string}} KeyedRetrievalTask */
11+
12+
export class Tasker {
13+
#lastRoundUrl
14+
/** @type {Task[]} */
15+
#remainingRoundTasks
16+
#fetch
17+
#activity
18+
19+
/**
20+
* @param {object} args
21+
* @param {globalThis.fetch} args.fetch
22+
* @param {ActivityState} args.activityState
23+
*/
24+
constructor ({
25+
fetch = globalThis.fetch,
26+
activityState = new ActivityState()
27+
} = {}) {
28+
this.#fetch = fetch
29+
this.#activity = activityState
30+
31+
this.maxTasksPerRound = 360
32+
33+
// TODO: persist these two values across module restarts
34+
this.#lastRoundUrl = 'unknown'
35+
this.#remainingRoundTasks = []
36+
}
37+
38+
/**
39+
* @returns {Task | undefined}
40+
*/
41+
async next () {
42+
await this.#updateCurrentRound()
43+
return this.#remainingRoundTasks.pop()
44+
}
45+
46+
async #updateCurrentRound () {
47+
console.log('Checking the current SPARK round...')
48+
let res = await this.#fetch('https://api.filspark.com/rounds/current', {
49+
method: 'GET',
50+
headers: { 'Content-Type': 'application/json' },
51+
redirect: 'manual',
52+
signal: AbortSignal.timeout(10_000)
53+
})
54+
55+
const roundUrl = await assertRedirectResponse(res, 'Failed to find the URL of the current SPARK round')
56+
this.#activity.onHealthy()
57+
if (roundUrl === this.#lastRoundUrl) {
58+
console.log('Round did not change since the last iteration')
59+
return
60+
}
61+
62+
console.log('Fetching round details at location %s', roundUrl)
63+
this.#lastRoundUrl = roundUrl
64+
res = await this.#fetch(`https://api.filspark.com${roundUrl}`, {
65+
method: 'GET',
66+
headers: { 'Content-Type': 'application/json' },
67+
signal: AbortSignal.timeout(10_000)
68+
})
69+
await assertOkResponse(res, 'Failed to fetch the current SPARK round')
70+
const { retrievalTasks, maxTasksPerNode, ...round } = await res.json()
71+
console.log('Current SPARK round:', round)
72+
console.log(' %s max tasks per round', maxTasksPerNode ?? '<n/a>')
73+
console.log(' %s retrieval tasks', retrievalTasks.length)
74+
this.maxTasksPerRound = maxTasksPerNode
75+
76+
const randomness = await getRandomnessForSparkRound(round.startEpoch)
77+
console.log(' randomness: %s', randomness)
78+
79+
this.#remainingRoundTasks = await pickTasksForNode({
80+
tasks: retrievalTasks,
81+
maxTasksPerRound: this.maxTasksPerRound,
82+
randomness,
83+
stationId: Zinnia.stationId
84+
})
85+
}
86+
}
87+
88+
const textEncoder = new TextEncoder()
89+
90+
/**
91+
* @param {Task} task
92+
* @param {string} randomness
93+
* @returns
94+
*/
95+
export async function getTaskKey (task, randomness) {
96+
assertEquals(typeof task, 'object', 'task must be an object')
97+
assertEquals(typeof task.cid, 'string', 'task.cid must be a string')
98+
assertEquals(typeof task.minerId, 'string', 'task.minerId must be a string')
99+
assertEquals(typeof randomness, 'string', 'randomness must be a string')
100+
101+
const data = [task.cid, task.minerId, randomness].join('\n')
102+
const hash = await crypto.subtle.digest('sha-256', textEncoder.encode(data))
103+
return BigInt('0x' + encodeHex(hash))
104+
}
105+
106+
/**
107+
* @param {string} stationId
108+
*/
109+
export async function getStationKey (stationId) {
110+
assertEquals(typeof stationId, 'string', 'stationId must be a string')
111+
112+
const hash = await crypto.subtle.digest('sha-256', textEncoder.encode(stationId))
113+
return BigInt('0x' + encodeHex(hash))
114+
}
115+
116+
/**
117+
* @param {object} args
118+
* @param {Task[]} args.tasks
119+
* @param {string} args.stationId
120+
* @param {string} args.randomness
121+
* @param {number} args.maxTasksPerRound
122+
* @returns {Promise<Task[]>}
123+
*/
124+
export async function pickTasksForNode ({ tasks, stationId, randomness, maxTasksPerRound }) {
125+
assertInstanceOf(tasks, Array, 'tasks must be an array')
126+
assertEquals(typeof stationId, 'string', 'stationId must be a string')
127+
assertEquals(typeof randomness, 'string', 'randomness must be a string')
128+
assertEquals(typeof maxTasksPerRound, 'number', 'maxTasksPerRound must be a number')
129+
130+
const keyedTasks = await Promise.all(tasks.map(
131+
async (t) => ({ ...t, key: await getTaskKey(t, randomness) })
132+
))
133+
const stationKey = await getStationKey(stationId)
134+
135+
/**
136+
* @param {{key: bigint}} a
137+
* @param {{key: bigint}} b
138+
* @returns {number}
139+
*/
140+
const comparator = (a, b) => {
141+
const ad = a.key ^ stationKey
142+
const bd = b.key ^ stationKey
143+
return ad > bd ? 1 : ad < bd ? -1 : 0
144+
}
145+
146+
keyedTasks.sort(comparator)
147+
keyedTasks.splice(maxTasksPerRound)
148+
149+
return keyedTasks.map(({ key, ...t }) => (t))
150+
}

test/drand-client.test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { test } from 'zinnia:test'
22
import { assertEquals } from 'zinnia:assert'
3-
import { getRandomnessForSparkRound } from '../lib/drand-client.js'
3+
import { getRandomnessForSparkRound } from '../lib/drand-client.js'
44

55
test('getRandomnessForSparkRound', async () => {
66
const randomness = await getRandomnessForSparkRound(4111111)

test/spark.js

+33-11
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,33 @@ const KNOWN_CID = 'bafkreih25dih6ug3xtj73vswccw423b56ilrwmnos4cbwhrceudopdp5sq'
1010
test('getRetrieval', async () => {
1111
const round = {
1212
roundId: '123',
13+
startEpoch: 4111111,
14+
maxTasksPerNode: 1,
1315
retrievalTasks: [
1416
{
1517
cid: 'bafkreidysaugf7iuvemebpzwxxas5rctbyiryykagup2ygkojmx7ag64gy',
16-
providerAddress: '/ip4/38.70.220.96/tcp/10201/p2p/12D3KooWSekjEqdSeHXkpQraVY2STL885svgmh6r2zEFHQKeJ3KD',
17-
protocol: 'graphsync'
18+
minerId: 'f010'
1819
},
1920
{
2021
cid: 'QmUMpWycKJ7GUDJp9GBRX4qWUFUePUmHzri9Tm1CQHEzbJ',
21-
providerAddress: '/dns4/elastic.dag.house/tcp/443/wss/p2p/QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC',
22-
protocol: 'bitswap'
22+
minerId: 'f020'
2323
}
2424
]
2525
}
2626
const requests = []
2727
const fetch = async (url, allOpts) => {
2828
const { signal, ...opts } = allOpts
2929
requests.push({ url, opts })
30+
if (url === 'https://api.filspark.com/rounds/current') {
31+
const headers = new Headers()
32+
headers.set('location', '/rounds/meridian/0x84607/115')
33+
return {
34+
status: 302,
35+
ok: false,
36+
headers
37+
}
38+
}
39+
3040
return {
3141
status: 200,
3242
ok: true,
@@ -37,14 +47,26 @@ test('getRetrieval', async () => {
3747
}
3848
const spark = new Spark({ fetch })
3949
const retrieval = await spark.getRetrieval()
40-
assertArrayIncludes(round.retrievalTasks, [retrieval])
41-
assertEquals(requests, [{
42-
url: 'https://api.filspark.com/rounds/current',
43-
opts: {
44-
method: 'GET',
45-
headers: { 'Content-Type': 'application/json' }
50+
assertArrayIncludes(round.retrievalTasks.map(JSON.stringify), [retrieval].map(JSON.stringify))
51+
assertEquals(requests, [
52+
{
53+
url: 'https://api.filspark.com/rounds/current',
54+
opts: {
55+
method: 'GET',
56+
redirect: 'manual',
57+
headers: { 'Content-Type': 'application/json' }
58+
}
59+
},
60+
{
61+
url: 'https://api.filspark.com/rounds/meridian/0x84607/115',
62+
opts: {
63+
headers: {
64+
'Content-Type': 'application/json'
65+
},
66+
method: 'GET'
67+
}
4668
}
47-
}])
69+
])
4870
})
4971

5072
test('fetchCAR - http', async () => {

0 commit comments

Comments
 (0)