Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: replace fast-atomic-write with steno #285

2 changes: 1 addition & 1 deletion packages/blockstore-fs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@
"dep-check": "aegir dep-check"
},
"dependencies": {
"@sgtpooki/steno-patched": "^3.1.1",
"blockstore-core": "^4.0.0",
"fast-write-atomic": "^0.2.0",
"interface-blockstore": "^5.0.0",
"interface-store": "^5.0.0",
"it-glob": "^2.0.1",
Expand Down
24 changes: 15 additions & 9 deletions packages/blockstore-fs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

import fs from 'node:fs/promises'
import path from 'node:path'
import { promisify } from 'node:util'
import { Writer as StenoWriter } from '@sgtpooki/steno-patched'
import {
Errors
} from 'blockstore-core'
// @ts-expect-error no types
import fwa from 'fast-write-atomic'
import glob from 'it-glob'
import map from 'it-map'
import parallelBatch from 'it-parallel-batch'
Expand All @@ -28,20 +26,19 @@ import type { Blockstore, Pair } from 'interface-blockstore'
import type { AwaitIterable } from 'interface-store'
import type { CID } from 'multiformats/cid'

const writeAtomic = promisify(fwa)

/**
* Write a file atomically
*/
async function writeFile (file: string, contents: Uint8Array): Promise<void> {
async function writeFile (writer: StenoWriter, file: string, contents: Uint8Array): Promise<void> {
try {
await writeAtomic(file, contents)
await writer.write(contents)
} catch (err: any) {
if (err.code === 'EPERM' && err.syscall === 'rename') {
// fast-write-atomic writes a file to a temp location before renaming it.
// steno writes a file to a temp location before renaming it.
// On Windows, if the final file already exists this error is thrown.
// No such error is thrown on Linux/Mac
// Make sure we can read & write to this file
// 2023-12-14: Is this still needed with steno?
await fs.access(file, fs.constants.F_OK | fs.constants.W_OK)

// The file was created by another context - this means there were
Expand Down Expand Up @@ -106,6 +103,7 @@ export class FsBlockstore implements Blockstore {
private readonly getManyConcurrency: number
private readonly deleteManyConcurrency: number
private readonly shardingStrategy: ShardingStrategy
private readonly writers = new Map<string, StenoWriter>()

constructor (location: string, init: FsBlockstoreInit = {}) {
this.path = path.resolve(location)
Expand Down Expand Up @@ -151,8 +149,16 @@ export class FsBlockstore implements Blockstore {
recursive: true
})
}
const filePath = path.join(this.path, dir, file)

let writer = this.writers.get(filePath)
if (writer == null) {
writer = new StenoWriter(filePath)
this.writers.set(filePath, writer)
}

await writeFile(path.join(this.path, dir, file), val)
await writeFile(writer, filePath, val)
this.writers.delete(filePath)

return key
} catch (err: any) {
Expand Down
2 changes: 1 addition & 1 deletion packages/datastore-fs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@
"dep-check": "aegir dep-check"
},
"dependencies": {
"@sgtpooki/steno-patched": "^3.1.1",
"datastore-core": "^9.0.0",
"fast-write-atomic": "^0.2.0",
"interface-datastore": "^8.0.0",
"interface-store": "^5.0.0",
"it-glob": "^2.0.1",
Expand Down
26 changes: 16 additions & 10 deletions packages/datastore-fs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

import fs from 'node:fs/promises'
import path from 'node:path'
import { promisify } from 'util'
import { Writer as StenoWriter } from '@sgtpooki/steno-patched'
import {
BaseDatastore, Errors
} from 'datastore-core'
// @ts-expect-error no types
import fwa from 'fast-write-atomic'
import {
Key, type KeyQuery, type Pair, type Query
} from 'interface-datastore'
Expand All @@ -28,21 +26,20 @@ import map from 'it-map'
import parallel from 'it-parallel-batch'
import type { AwaitIterable } from 'interface-store'

const writeAtomic = promisify(fwa)

/**
* Write a file atomically
*/
async function writeFile (path: string, contents: Uint8Array): Promise<void> {
async function writeFile (writer: StenoWriter, file: string, contents: Uint8Array): Promise<void> {
try {
await writeAtomic(path, contents)
await writer.write(contents)
} catch (err: any) {
if (err.code === 'EPERM' && err.syscall === 'rename') {
// fast-write-atomic writes a file to a temp location before renaming it.
// steno writes a file to a temp location before renaming it.
// On Windows, if the final file already exists this error is thrown.
// No such error is thrown on Linux/Mac
// Make sure we can read & write to this file
await fs.access(path, fs.constants.F_OK | fs.constants.W_OK)
// 2023-12-14: Is this still needed with steno?
await fs.access(file, fs.constants.F_OK | fs.constants.W_OK)

// The file was created by another context - this means there were
// attempts to write the same block by two different function calls
Expand Down Expand Up @@ -76,6 +73,7 @@ export class FsDatastore extends BaseDatastore {
private readonly deleteManyConcurrency: number
private readonly getManyConcurrency: number
private readonly putManyConcurrency: number
private readonly writers = new Map<string, StenoWriter>()

constructor (location: string, init: FsDatastoreInit = {}) {
super()
Expand Down Expand Up @@ -156,7 +154,15 @@ export class FsDatastore extends BaseDatastore {
await fs.mkdir(parts.dir, {
recursive: true
})
await writeFile(parts.file, val)
const filePath = parts.file
let writer = this.writers.get(filePath)
if (writer == null) {
writer = new StenoWriter(filePath)
this.writers.set(filePath, writer)
}

await writeFile(writer, filePath, val)
this.writers.delete(filePath)

return key
} catch (err: any) {
Expand Down