From 272c82499ccecaa19aa64eed463691939e88561a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Droz?= Date: Thu, 15 Oct 2020 18:48:35 -0300 Subject: [PATCH] Support async function for readFileFn Since implementing a good concurrency-handler may be painful we provide a promise-based read-locking one so that asyncReadFileFn is safe to use out-of-the-box. Given that streams-read are often unable to correctly/exactly compute their final amount of bytes to upload, the number of chunks may also be off. We handle this in a safe way via FlowChunk.readBytes: - if read using an async function - and the last chunk read less bytes than usual (including 0) - and the current chunk read no byte - assume that this chunk is in fact superfluous and simulate its completion. Previous discussion on: * https://github.com/flowjs/flow.js/issues/42 * https://github.com/flowjs/flow.js/issues/318 (In the long-term Flow.js may have to support the concept of "approximate chunk number") --- src/DeferredPromise.js | 19 +++++ src/Flow.js | 4 +- src/FlowChunk.js | 117 ++++++++++++++++++++++++-- test/asyncSpec.js | 183 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 317 insertions(+), 6 deletions(-) create mode 100644 src/DeferredPromise.js create mode 100644 test/asyncSpec.js diff --git a/src/DeferredPromise.js b/src/DeferredPromise.js new file mode 100644 index 00000000..e7bf314b --- /dev/null +++ b/src/DeferredPromise.js @@ -0,0 +1,19 @@ +export default class DeferredPromise { + // https://stackoverflow.com/a/47112177 + constructor() { + this.resolved = false; + this._promise = new Promise((resolve, reject) => { + // assign the resolve and reject functions to `this` + // making them usable on the class instance + this.resolve = () => { + this.resolved = true; + return resolve(); + }; + this.reject = reject; + }); + // bind `then` and `catch` to implement the same interface as Promise + this.then = this._promise.then.bind(this._promise); + this.catch = this._promise.catch.bind(this._promise); + this[Symbol.toStringTag] = 'Promise'; + } +}; diff --git a/src/Flow.js b/src/Flow.js index e1e17d61..2ec13430 100644 --- a/src/Flow.js +++ b/src/Flow.js @@ -32,6 +32,7 @@ import {each, async, arrayRemove, extend, webAPIFileRead} from './tools'; * @param {Array.} [opts.successStatuses] * @param {Function} [opts.initFileFn] * @param {Function} [opts.readFileFn] + * @param {Function} [opts.asyncReadFileFn] * @param {Function} [opts.generateUniqueIdentifier] * @constructor */ @@ -91,7 +92,8 @@ export default class Flow { successStatuses: [200, 201, 202], onDropStopPropagation: false, initFileFn: null, - readFileFn: webAPIFileRead + readFileFn: webAPIFileRead, + asyncReadFileFn: null }; /** diff --git a/src/FlowChunk.js b/src/FlowChunk.js index 0bcff137..d72539b8 100644 --- a/src/FlowChunk.js +++ b/src/FlowChunk.js @@ -1,4 +1,5 @@ import {evalOpts, each, extend} from './tools'; +import DeferredPromise from './DeferredPromise'; /** * Class for storing a single chunk @@ -60,6 +61,20 @@ export default class FlowChunk { */ this.readState = 0; + /** + * Mostly for streams: how many bytes were actually read + * @type {number} -1 = not read + */ + this.readBytes = -1; + + /** + * File-level read state. + * When reading from a stream we can't slice a known-size buffer in chunks. + * These are constructed sequentially from blocking read. This list stores the + * respective Promise status of each chunk. + * @type {Promise} + */ + this.readStreamState = new DeferredPromise(); /** * Bytes transferred from total request size @@ -241,13 +256,13 @@ export default class FlowChunk { * Finish preprocess state * @function */ - preprocessFinished() { + async preprocessFinished() { // Re-compute the endByte after the preprocess function to allow an // implementer of preprocess to set the fileObj size this.endByte = this.computeEndByte(); this.preprocessState = 2; - this.send(); + await this.send(); } /** @@ -261,12 +276,88 @@ export default class FlowChunk { } /** - * Uploads the actual data in a POST call + * asyncReadFileFn() helper provides the ability of asynchronous read() + * Eg: When reading from a ReadableStream.getReader(). + * + * But: + * - FlowChunk.send() can be called up to {simultaneousUploads} times. + * - Concurrent or misordered read() would result in a corrupted payload. + * + * This function guards from this: As soon a previous chunk exists and as long as + * this previous chunk is not fully read(), we assume corresponding reader is unavailable + * and wait for it. * @function */ - send() { + async readStreamGuard() { + var map = this.fileObj.chunks.map(e => e.readStreamState).slice(0, this.offset); + try { + await Promise.all(map); + } catch(err) { + console.error(`Chunk ${this.offset}: Error while waiting for ${map.length} previous chunks being read.`); + throw(err); + } + } + + async readStreamChunk() { + if (this.readStreamState.resolved) { + // This is normally impossible to reach. Has it been uploaded? + console.warn(`Chunk ${this.offset} already read. Bytes = ${bytes ? bytes.size : null}. xhr initialized = ${this.xhr ? 1 : 0}`); + // We may want to retry (or not) to upload (but never try to read from the stream again or risk misordered chunks + return; + } + + await this.readStreamGuard(); + var bytes, asyncRead = this.flowObj.opts.asyncReadFileFn; + + bytes = await asyncRead(this.fileObj, this.startByte, this.endByte, this.fileObj.file.type, this); + this.readStreamState.resolve(); + + // Equivalent to readFinished() + this.readState = 2; + + if (bytes) { + this.readBytes = bytes.size || bytes.size === 0 ? bytes.size : -1; + } + + if (bytes && bytes.size > 0) { + if (this.flowObj.chunkSize) { + // This may imply a miscalculation of the total chunk numbers. + console.warn(`Chunk ${this.offset}: returned too much data. Got ${bytes.size}. Expected not more than ${this.flowObj.chunkSize}.`); + } + this.bytes = bytes; + this.xhrSend(bytes); + return; + } + + if (this.offset > 0) { + // last size of the buffer read for the previous chunk + var lastReadBytes = this.fileObj.chunks[this.offset - 1].readBytes; + if (lastReadBytes < parseInt(this.chunkSize)) { + console.warn(`Chunk ${this.offset} seems superfluous. No byte read() meanwhile previous chunk was only ${lastReadBytes} bytes instead of ${this.chunkSize}`); + // The last chunk's buffer wasn't even full. That means the number of chunk may + // have been miscomputed and this chunk is superfluous. + // We make a fake request so that overall status is "complete" and we can move on + // on this FlowFile. + this.pendingRetry = false; + this.xhr = {readyState: 5, status: 1 }; + this.doneHandler(null); + return; + } + } + + console.warn(`Chunk ${this.offset}: no byte to read()`); + this.pendingRetry = false; + } + + /** + * Prepare data (preprocess/read) data then call xhrSend() + * @function + */ + async send() { var preprocess = this.flowObj.opts.preprocess; var read = this.flowObj.opts.readFileFn; + var asyncRead = this.flowObj.opts.asyncReadFileFn; + if (typeof preprocess === 'function') { switch (this.preprocessState) { case 0: @@ -277,6 +368,13 @@ export default class FlowChunk { return; } } + + if (asyncRead) { + this.readState = 1; + await this.readStreamChunk(); + return; + } + switch (this.readState) { case 0: this.readState = 1; @@ -285,6 +383,15 @@ export default class FlowChunk { case 1: return; } + + this.xhrSend(this.bytes); + } + + /** + * Actually uploads data in a POST call + * @function + */ + xhrSend(bytes) { if (this.flowObj.opts.testChunks && !this.tested) { this.test(); return; @@ -301,7 +408,7 @@ export default class FlowChunk { this.xhr.addEventListener('error', this.doneHandler.bind(this), false); var uploadMethod = evalOpts(this.flowObj.opts.uploadMethod, this.fileObj, this); - var data = this.prepareXhrRequest(uploadMethod, false, this.flowObj.opts.method, this.bytes); + var data = this.prepareXhrRequest(uploadMethod, false, this.flowObj.opts.method, bytes); var changeRawDataBeforeSend = this.flowObj.opts.changeRawDataBeforeSend; if (typeof changeRawDataBeforeSend === 'function') { data = changeRawDataBeforeSend(this, data); diff --git a/test/asyncSpec.js b/test/asyncSpec.js new file mode 100644 index 00000000..5a80d41d --- /dev/null +++ b/test/asyncSpec.js @@ -0,0 +1,183 @@ +describe('upload stream', function() { + /** + * @type {Flow} + */ + var flow; + /** + * @type {FakeXMLHttpRequest} + */ + var xhr_server; + + var random_sizes = false; + + /** + * Generate an ASCII file composed of parts of characters long. + * The char for each part is randomly choosen from the below alphabet + */ + function gen_file(num, segment_size = 64) { + var alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789()_-?!./|'; + return alphabet + .repeat(Math.ceil(num / alphabet.length)) + .split('') + .sort(() => Math.random()-0.5) + .map((v, i) => i < num ? v.repeat(segment_size) : null) + .filter(e => e) + .join(''); + } + + function hash(content) { + return window.crypto.subtle.digest('SHA-256', new TextEncoder('utf-8').encode(content)); + } + + function hex(buff) { + return [].map.call(new Uint8Array(buff), b => ('00' + b.toString(16)).slice(-2)).join(''); + } + + class Streamer { + constructor(chunk_size) { + this._reader = null; + this.chunk_size = chunk_size; + + // See the comment in read() for why we implement a custom reader here. + this.buffer = null; + this.index = 0; + }; + + init(flowObj) { + this._reader = flowObj.file.stream().getReader(); + }; + + async read(flowObj, startByte, endByte, fileType, chunk) { + // chunk._log(`Start reading from ${this.buffer !== null ? 'existing' : 'the'} buffer`); + if (this.buffer === null) { + // console.log(`[asyncRead ${chunk.offset}] no preexisting buffer => reader.read()`); + /* + Here we would expect a partial read of 64kb (by implementation) but it seems that + *all* the buffer is returned making difficult to make a test based on ReadableStreamDefaultReader() behavior. + As such we simulate it. + */ + const {value: buffer, done} = await this._reader.read(); + this.buffer = buffer; + + if (buffer) { + // console.log(`[asyncRead ${chunk.offset}] got a buffer of ${buffer.length} bytes...`); + } else { + // console.log(`[asyncRead ${chunk.offset}] no buffer[bail]`); + return null; + } + } + + if (this.buffer.length === 0) { + // console.log(`[asyncRead ${chunk.offset}] this.buffer is null[bail]`); + return null; + } + + // console.log(`[asyncRead ${chunk.offset}] Read slice[${this.index}:${this.index + this.chunk_size}] a buffer of ${this.buffer.length} bytes`); + var buffer_chunk = this.buffer.slice(this.index, this.index + this.chunk_size); + + if (!buffer_chunk) { + // console.log(`[asyncRead ${chunk.offset}] null slice`); + // console.log(buffer_chunk); + } else { + // chunk._log(`Read slice of ${buffer_chunk.length} bytes`); + this.index += this.chunk_size; + return new Blob([buffer_chunk], {type: 'application/octet-stream'}); + } + + return null; + }; + } + + beforeAll(function() { + jasmine.DEFAULT_TIMEOUT_INTERVAL = 5000; + + xhr_server = sinon.createFakeServer({ + // autoRespondAfter: 50 + respondImmediately: true, + }); + }); + + + afterAll(function() { + xhr_server.restore(); + }); + + beforeEach(function () { + // jasmine.clock().install(); + + flow = new Flow({ + progressCallbacksInterval: 0, + forceChunkSize: true, + testChunks: false, + generateUniqueIdentifier: function (file) { + return file.size; + } + }); + + xhr_server.respondWith('ok'); + }); + + afterEach(function () { + // jasmine.clock().uninstall(); + xhr_server.restore(); + }); + + it('synchronous initFileFn and asyncReadFileFn', async function (done) { + var chunk_size, chunk_num, simultaneousUploads, upload_chunk_size; + + if (random_sizes) { + chunk_size = Math.ceil(Math.random() * 30), + chunk_num = Math.ceil(Math.random() * 100), + simultaneousUploads = Math.ceil(Math.random() * 20), + upload_chunk_size = Math.max(1, Math.ceil(Math.random() * chunk_size)); + } else { + chunk_size = 23, + chunk_num = 93, + simultaneousUploads = 17, + upload_chunk_size = Math.max(1, Math.ceil(Math.random() * chunk_size)); + } + + var content = gen_file(chunk_num, chunk_size), + orig_hash = hex(await hash(content)), + sample_file = new File([content], 'foobar.bin'); + + console.info(`Test File is ${chunk_num} bytes long (sha256: ${orig_hash}).`); + console.info(`Now uploads ${simultaneousUploads} simultaneous chunks of at most ${upload_chunk_size} bytes`); + + flow.on('fileError', jasmine.createSpy('error')); + flow.on('fileSuccess', jasmine.createSpy('success')); + flow.on('complete', () => { + validate(done, content, orig_hash); + }); + + var streamer = new Streamer(upload_chunk_size); // chunk_size); + flow.opts.chunkSize = upload_chunk_size; + flow.opts.simultaneousUploads = simultaneousUploads; + flow.opts.initFileFn = streamer.init.bind(streamer); + flow.opts.readFileFn = streamer.read.bind(streamer); + flow.opts.asyncReadFileFn = streamer.read.bind(streamer); + flow.addFile(sample_file); + flow.upload(); + }); + + function validate(done, content, orig_hash) { + var predicted_request_number = Math.ceil(content.length / flow.opts.chunkSize); + expect(xhr_server.requests.length).toBe(predicted_request_number); + + var file = flow.files[0]; + expect(file.progress()).toBe(1); + expect(file.isUploading()).toBe(false); + expect(file.isComplete()).toBe(true); + + // An array of promises of obtaining the corresponding request's body (= payload) + var payload_contents = xhr_server.requests.map(x => x.requestBody.get('file').text()); + Promise.all(payload_contents) + .then(values => hash(values.join(''))) + .then(hash => hex(hash)) + .then(hexhash => { + // console.log(orig_hash, hexhash); + expect(hexhash).toBe(orig_hash); + done(); + }); + } +});