Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throws errors when parsing invalid JSON #9

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,3 @@ Will output:
{ a: 42 }
{ hello: 'world' }
```

If invalid JSON gets written, it's silently ignored.
83 changes: 51 additions & 32 deletions lib/json-stream.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,62 @@
var util = require('util'),
TransformStream = require('stream').Transform;
const util = require('util');
const StringDecoder = require('string_decoder').StringDecoder;
const Transform = require('stream').Transform;
util.inherits(JSONStream, Transform);

module.exports = function (options) {
return new JSONStream(options);
};

var JSONStream = module.exports.JSONStream = function (options) {
// Gets \n-delimited JSON string data, and emits the parsed objects
function JSONStream(options) {
options = options || {};
TransformStream.call(this, options);
Transform.call(this, options);
this._writableState.objectMode = false;
this._readableState.objectMode = true;
this._async = options.async || false;
};
util.inherits(JSONStream, TransformStream);

JSONStream.prototype._transform = function (data, encoding, callback) {
if (!Buffer.isBuffer(data)) data = new Buffer(data);
if (this._buffer) {
data = Buffer.concat([this._buffer, data]);
this._buffer = '';
this._decoder = new StringDecoder('utf8');
this.async = options && options.async? setImmediate: function (fn) { fn(); };
}

JSONStream.prototype._transform = function(chunk, encoding, cb) {
this._buffer += this._decoder.write(chunk);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcrugzz so you reckon this bit should do buffer concat, not string concat?

// split on newlines
var lines = this._buffer.split(/\r?\n/);
// keep the last partial line buffered
this._buffer = lines.pop();
for (var l = 0; l < lines.length; l++) {
var line = lines[l];
this._pushLine(line);
}
cb();
};

var ptr = 0, start = 0;
while (++ptr <= data.length) {
if (data[ptr] === 10 || ptr === data.length) {
var line;
try {
line = JSON.parse(data.slice(start, ptr));
}
catch (ex) { }
if (line) {
this.push(line);
line = null;
}
if (data[ptr] === 10) start = ++ptr;
JSONStream.prototype._pushLine = function(line) {
var self = this;

if (line.trim()) {
try {
var obj = JSON.parse(line);
} catch (er) {
this.async(function () {
self.emit('error', er);
});
return;
}
// push the parsed object out to the readable consumer

this.async(function () {
self.push(obj);
});
}
}

JSONStream.prototype._flush = function(cb) {
// Just handle any leftover
var rem = this._buffer.trim();
this._pushLine(rem);
this.async(function () {
cb();
});
};

this._buffer = data.slice(start);
return this._async
? void setImmediate(callback)
: void callback();
module.exports = function (options) {
return new JSONStream(options);
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"author": "Maciej Małecki <me@mmalecki.com>",
"main": "./lib/json-stream",
"scripts": {
"test": "node test/pipe-test.js && node test/json-stream-test.js && node test/throw-in-readable-test.js"
"test": "node test/pipe-test.js && node test/json-stream-test.js && node test/throw-in-readable-test.js && node test/json-error-test.js"
},
"repository": {
"type": "git",
Expand Down
30 changes: 30 additions & 0 deletions test/json-error-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
var assert = require('assert');
var JSONStream = require('../');

var stream = JSONStream();

var objects = [];

stream.on('data', function (data) {
objects.push(data);
});

stream.on('error', function (error) {
objects.push(error);
assert.equal(error.message, 'Unexpected token t');
});

stream.on('end', function () {
assert.equal(objects.length, 3);
assert.deepEqual(objects[0], {"this": "is valid JSON"});
assert.equal(objects[1].message, 'Unexpected token t');
assert.deepEqual(objects[2], ["this", "is", "valid", "JSON"]);
});

stream.write('{"this": "is valid JSON"}\n');
try {
stream.write('{this is not valid JSON]\n');
} catch (e) {
}
stream.write('["this", "is", "valid", "JSON"]\n');
stream.end();
4 changes: 2 additions & 2 deletions test/json-stream-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ write(stream, '{"a":', '42', ',"b": 1337', '}\n{"hel', 'lo": "wor', 'ld"}\n');

stream = JSONStream();
expect(stream, [ { a: 42 }, { hello: 'world' } ]);
write(stream, '{"a":', '42}\n{ blah blah blah }\n{"hel', 'lo": "wor', 'ld"}\n');
write(stream, '{"a":', '42}\n\n{"hel', 'lo": "wor', 'ld"}\n');

stream = JSONStream();
expect(stream, [ { a: 42 }, { hello: 'world' } ]);
write(stream, '{"a":', '42}\n{ blah blah', 'blah }\n{"hel', 'lo": "wor', 'ld"}\n');
write(stream, '{"a":', '42}\n{"hel', 'lo": "wor', 'ld"}\n');

stream = JSONStream();
expect(stream, [ { å: '⇢ utf8!', b: 1337 } ]);
Expand Down
2 changes: 1 addition & 1 deletion test/pipe-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ source._read = function () {
};
source.pipe(dest);
source.push('{"a": 4');
source.push('2}\nblah');
source.push('2}');
source.push('\n{"hello"');
source.push(': "world"}\n');
source.push(null);