Skip to content

Commit

Permalink
Support async function for readFileFn
Browse files Browse the repository at this point in the history
Since implementing a good concurrency-handler may be painful we
 provide a promise-based read-locking one so that asyncReadFileFn
 is safe to use out-of-the-box.

Given that streams-read are often unable to correctly/exactly compute their
 final amount of bytes to upload, the number of chunks may also be off.
 We handle this in a safe way via FlowChunk.readBytes:
- if read using an async function
- and the last chunk read less bytes than usual (including 0)
- and the current chunk read no byte
- assume that this chunk is in fact superfluous and simulate its completion.

Previous discussion on:
* flowjs#42
* flowjs#318
(In the long-term Flow.js may have to support the concept of "approximate chunk number")
  • Loading branch information
Raphaël Droz committed Oct 15, 2020
1 parent 9210467 commit 52404d0
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 6 deletions.
19 changes: 19 additions & 0 deletions src/DeferredPromise.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
export default class DeferredPromise {
// https://stackoverflow.com/a/47112177
constructor() {
this.resolved = false;
this._promise = new Promise((resolve, reject) => {
// assign the resolve and reject functions to `this`
// making them usable on the class instance
this.resolve = () => {
this.resolved = true;
return resolve();
};
this.reject = reject;
});
// bind `then` and `catch` to implement the same interface as Promise
this.then = this._promise.then.bind(this._promise);
this.catch = this._promise.catch.bind(this._promise);
this[Symbol.toStringTag] = 'Promise';
}
};
4 changes: 3 additions & 1 deletion src/Flow.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {each, async, arrayRemove, extend, webAPIFileRead} from './tools';
* @param {Array.<number>} [opts.successStatuses]
* @param {Function} [opts.initFileFn]
* @param {Function} [opts.readFileFn]
* @param {Function} [opts.asyncReadFileFn]
* @param {Function} [opts.generateUniqueIdentifier]
* @constructor
*/
Expand Down Expand Up @@ -91,7 +92,8 @@ export default class Flow {
successStatuses: [200, 201, 202],
onDropStopPropagation: false,
initFileFn: null,
readFileFn: webAPIFileRead
readFileFn: webAPIFileRead,
asyncReadFileFn: null
};

/**
Expand Down
117 changes: 112 additions & 5 deletions src/FlowChunk.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {evalOpts, each, extend} from './tools';
import DeferredPromise from './DeferredPromise';

/**
* Class for storing a single chunk
Expand Down Expand Up @@ -60,6 +61,20 @@ export default class FlowChunk {
*/
this.readState = 0;

/**
* Mostly for streams: how many bytes were actually read
* @type {number} -1 = not read
*/
this.readBytes = -1;

/**
* File-level read state.
* When reading from a stream we can't slice a known-size buffer in chunks.
* These are constructed sequentially from blocking read. This list stores the
* respective Promise status of each chunk.
* @type {Promise}
*/
this.readStreamState = new DeferredPromise();

/**
* Bytes transferred from total request size
Expand Down Expand Up @@ -241,13 +256,13 @@ export default class FlowChunk {
* Finish preprocess state
* @function
*/
preprocessFinished() {
async preprocessFinished() {
// Re-compute the endByte after the preprocess function to allow an
// implementer of preprocess to set the fileObj size
this.endByte = this.computeEndByte();

this.preprocessState = 2;
this.send();
await this.send();
}

/**
Expand All @@ -261,12 +276,88 @@ export default class FlowChunk {
}

/**
* Uploads the actual data in a POST call
* asyncReadFileFn() helper provides the ability of asynchronous read()
* Eg: When reading from a ReadableStream.getReader().
*
* But:
* - FlowChunk.send() can be called up to {simultaneousUploads} times.
* - Concurrent or misordered read() would result in a corrupted payload.
*
* This function guards from this: As soon a previous chunk exists and as long as
* this previous chunk is not fully read(), we assume corresponding reader is unavailable
* and wait for it.
* @function
*/
send() {
async readStreamGuard() {
var map = this.fileObj.chunks.map(e => e.readStreamState).slice(0, this.offset);
try {
await Promise.all(map);
} catch(err) {
console.error(`Chunk ${this.offset}: Error while waiting for ${map.length} previous chunks being read.`);
throw(err);
}
}

async readStreamChunk() {
if (this.readStreamState.resolved) {
// This is normally impossible to reach. Has it been uploaded?
console.warn(`Chunk ${this.offset} already read. Bytes = ${bytes ? bytes.size : null}. xhr initialized = ${this.xhr ? 1 : 0}`);
// We may want to retry (or not) to upload (but never try to read from the stream again or risk misordered chunks
return;
}

await this.readStreamGuard();
var bytes, asyncRead = this.flowObj.opts.asyncReadFileFn;

bytes = await asyncRead(this.fileObj, this.startByte, this.endByte, this.fileObj.file.type, this);
this.readStreamState.resolve();

// Equivalent to readFinished()
this.readState = 2;

if (bytes) {
this.readBytes = bytes.size || bytes.size === 0 ? bytes.size : -1;
}

if (bytes && bytes.size > 0) {
if (this.flowObj.chunkSize) {
// This may imply a miscalculation of the total chunk numbers.
console.warn(`Chunk ${this.offset}: returned too much data. Got ${bytes.size}. Expected not more than ${this.flowObj.chunkSize}.`);
}
this.bytes = bytes;
this.xhrSend(bytes);
return;
}

if (this.offset > 0) {
// last size of the buffer read for the previous chunk
var lastReadBytes = this.fileObj.chunks[this.offset - 1].readBytes;
if (lastReadBytes < parseInt(this.chunkSize)) {
console.warn(`Chunk ${this.offset} seems superfluous. No byte read() meanwhile previous chunk was only ${lastReadBytes} bytes instead of ${this.chunkSize}`);
// The last chunk's buffer wasn't even full. That means the number of chunk may
// have been miscomputed and this chunk is superfluous.
// We make a fake request so that overall status is "complete" and we can move on
// on this FlowFile.
this.pendingRetry = false;
this.xhr = {readyState: 5, status: 1 };
this.doneHandler(null);
return;
}
}

console.warn(`Chunk ${this.offset}: no byte to read()`);
this.pendingRetry = false;
}

/**
* Prepare data (preprocess/read) data then call xhrSend()
* @function
*/
async send() {
var preprocess = this.flowObj.opts.preprocess;
var read = this.flowObj.opts.readFileFn;
var asyncRead = this.flowObj.opts.asyncReadFileFn;

if (typeof preprocess === 'function') {
switch (this.preprocessState) {
case 0:
Expand All @@ -277,6 +368,13 @@ export default class FlowChunk {
return;
}
}

if (asyncRead) {
this.readState = 1;
await this.readStreamChunk();
return;
}

switch (this.readState) {
case 0:
this.readState = 1;
Expand All @@ -285,6 +383,15 @@ export default class FlowChunk {
case 1:
return;
}

this.xhrSend(this.bytes);
}

/**
* Actually uploads data in a POST call
* @function
*/
xhrSend(bytes) {
if (this.flowObj.opts.testChunks && !this.tested) {
this.test();
return;
Expand All @@ -301,7 +408,7 @@ export default class FlowChunk {
this.xhr.addEventListener('error', this.doneHandler.bind(this), false);

var uploadMethod = evalOpts(this.flowObj.opts.uploadMethod, this.fileObj, this);
var data = this.prepareXhrRequest(uploadMethod, false, this.flowObj.opts.method, this.bytes);
var data = this.prepareXhrRequest(uploadMethod, false, this.flowObj.opts.method, bytes);
var changeRawDataBeforeSend = this.flowObj.opts.changeRawDataBeforeSend;
if (typeof changeRawDataBeforeSend === 'function') {
data = changeRawDataBeforeSend(this, data);
Expand Down
183 changes: 183 additions & 0 deletions test/asyncSpec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
describe('upload stream', function() {
/**
* @type {Flow}
*/
var flow;
/**
* @type {FakeXMLHttpRequest}
*/
var xhr_server;

var random_sizes = false;

/**
* Generate an ASCII file composed of <num> parts of <segment_size> characters long.
* The char for each part is randomly choosen from the below alphabet
*/
function gen_file(num, segment_size = 64) {
var alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789()_-?!./|';
return alphabet
.repeat(Math.ceil(num / alphabet.length))
.split('')
.sort(() => Math.random()-0.5)
.map((v, i) => i < num ? v.repeat(segment_size) : null)
.filter(e => e)
.join('');
}

function hash(content) {
return window.crypto.subtle.digest('SHA-256', new TextEncoder('utf-8').encode(content));
}

function hex(buff) {
return [].map.call(new Uint8Array(buff), b => ('00' + b.toString(16)).slice(-2)).join('');
}

class Streamer {
constructor(chunk_size) {
this._reader = null;
this.chunk_size = chunk_size;

// See the comment in read() for why we implement a custom reader here.
this.buffer = null;
this.index = 0;
};

init(flowObj) {
this._reader = flowObj.file.stream().getReader();
};

async read(flowObj, startByte, endByte, fileType, chunk) {
// chunk._log(`Start reading from ${this.buffer !== null ? 'existing' : 'the'} buffer`);
if (this.buffer === null) {
// console.log(`[asyncRead ${chunk.offset}] no preexisting buffer => reader.read()`);
/*
Here we would expect a partial read of 64kb (by implementation) but it seems that
*all* the buffer is returned making difficult to make a test based on ReadableStreamDefaultReader() behavior.
As such we simulate it.
*/
const {value: buffer, done} = await this._reader.read();
this.buffer = buffer;

if (buffer) {
// console.log(`[asyncRead ${chunk.offset}] got a buffer of ${buffer.length} bytes...`);
} else {
// console.log(`[asyncRead ${chunk.offset}] no buffer[bail]`);
return null;
}
}

if (this.buffer.length === 0) {
// console.log(`[asyncRead ${chunk.offset}] this.buffer is null[bail]`);
return null;
}

// console.log(`[asyncRead ${chunk.offset}] Read slice[${this.index}:${this.index + this.chunk_size}] a buffer of ${this.buffer.length} bytes`);
var buffer_chunk = this.buffer.slice(this.index, this.index + this.chunk_size);

if (!buffer_chunk) {
// console.log(`[asyncRead ${chunk.offset}] null slice`);
// console.log(buffer_chunk);
} else {
// chunk._log(`Read slice of ${buffer_chunk.length} bytes`);
this.index += this.chunk_size;
return new Blob([buffer_chunk], {type: 'application/octet-stream'});
}

return null;
};
}

beforeAll(function() {
jasmine.DEFAULT_TIMEOUT_INTERVAL = 5000;

xhr_server = sinon.createFakeServer({
// autoRespondAfter: 50
respondImmediately: true,
});
});


afterAll(function() {
xhr_server.restore();
});

beforeEach(function () {
// jasmine.clock().install();

flow = new Flow({
progressCallbacksInterval: 0,
forceChunkSize: true,
testChunks: false,
generateUniqueIdentifier: function (file) {
return file.size;
}
});

xhr_server.respondWith('ok');
});

afterEach(function () {
// jasmine.clock().uninstall();
xhr_server.restore();
});

it('synchronous initFileFn and asyncReadFileFn', async function (done) {
var chunk_size, chunk_num, simultaneousUploads, upload_chunk_size;

if (random_sizes) {
chunk_size = Math.ceil(Math.random() * 30),
chunk_num = Math.ceil(Math.random() * 100),
simultaneousUploads = Math.ceil(Math.random() * 20),
upload_chunk_size = Math.max(1, Math.ceil(Math.random() * chunk_size));
} else {
chunk_size = 23,
chunk_num = 93,
simultaneousUploads = 17,
upload_chunk_size = Math.max(1, Math.ceil(Math.random() * chunk_size));
}

var content = gen_file(chunk_num, chunk_size),
orig_hash = hex(await hash(content)),
sample_file = new File([content], 'foobar.bin');

console.info(`Test File is ${chunk_num} bytes long (sha256: ${orig_hash}).`);
console.info(`Now uploads ${simultaneousUploads} simultaneous chunks of at most ${upload_chunk_size} bytes`);

flow.on('fileError', jasmine.createSpy('error'));
flow.on('fileSuccess', jasmine.createSpy('success'));
flow.on('complete', () => {
validate(done, content, orig_hash);
});

var streamer = new Streamer(upload_chunk_size); // chunk_size);
flow.opts.chunkSize = upload_chunk_size;
flow.opts.simultaneousUploads = simultaneousUploads;
flow.opts.initFileFn = streamer.init.bind(streamer);
flow.opts.readFileFn = streamer.read.bind(streamer);
flow.opts.asyncReadFileFn = streamer.read.bind(streamer);
flow.addFile(sample_file);
flow.upload();
});

function validate(done, content, orig_hash) {
var predicted_request_number = Math.ceil(content.length / flow.opts.chunkSize);
expect(xhr_server.requests.length).toBe(predicted_request_number);

var file = flow.files[0];
expect(file.progress()).toBe(1);
expect(file.isUploading()).toBe(false);
expect(file.isComplete()).toBe(true);

// An array of promises of obtaining the corresponding request's body (= payload)
var payload_contents = xhr_server.requests.map(x => x.requestBody.get('file').text());
Promise.all(payload_contents)
.then(values => hash(values.join('')))
.then(hash => hex(hash))
.then(hexhash => {
// console.log(orig_hash, hexhash);
expect(hexhash).toBe(orig_hash);
done();
});
}
});

0 comments on commit 52404d0

Please sign in to comment.