Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions lib/protocol/Writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ Writer.prototype.finializeParameters = function finializeParameters(
var stream, header;
this._streamErrorListeners = [];

this._lobs.forEach((stream) => {
for (var stream of this._lobs) {
if (stream._readableState.errored) {
cb(stream._readableState.errored);
return;
Expand All @@ -141,7 +141,7 @@ Writer.prototype.finializeParameters = function finializeParameters(
self._streamErrorListeners.push(errorListener); // keep track so it can
// be removed later
stream.once('error', errorListener);
});
}

function finalize() {
/* jshint bitwise:false */
Expand All @@ -164,6 +164,7 @@ Writer.prototype.finializeParameters = function finializeParameters(
stream.removeListener('error', onerror);
stream.removeListener('end', onend);
stream.removeListener('readable', onreadable);
stream.removeListener('close', onclose);
}

function onerror(err) {
Expand All @@ -173,6 +174,13 @@ Writer.prototype.finializeParameters = function finializeParameters(
cb(err);
}

function onclose() {
// close events indicate no other events will be emitted, so the data
// was not completely consumed since end was not called
cleanup();
cb(createDestroyedStreamError());
}

function onend() {
/* jshint validthis:true */
cleanup();
Expand Down Expand Up @@ -234,6 +242,8 @@ Writer.prototype.finializeParameters = function finializeParameters(
function next() {
if (!self._lobs.length || bytesRemainingForLOBs <= 0) {
return cb(null);
} else if (self._lobs[0].destroyed) {
return cb(createDestroyedStreamError());
}
// set readable stream
stream = self._lobs[0];
Expand All @@ -248,6 +258,7 @@ Writer.prototype.finializeParameters = function finializeParameters(
stream.on('error', onerror);
stream.on('end', onend);
stream.on('readable', onreadable);
stream.on('close', onclose);
onreadable.call(stream);
}

Expand Down Expand Up @@ -276,18 +287,19 @@ Writer.prototype.finalizeWriteLobRequest = function finalizeWriteLobRequest(
var self = this;
var stream, header;

this._lobs.forEach((stream) => {
for (var stream of this._lobs) {
if (stream._errored) {
cb(stream._errored);
return;
}
});
}

function cleanup() {
// remove event listeners
stream.removeListener('error', onerror);
stream.removeListener('end', onend);
stream.removeListener('readable', onreadable);
stream.removeListener('close', onclose);
}

function onerror(err) {
Expand All @@ -297,6 +309,12 @@ Writer.prototype.finalizeWriteLobRequest = function finalizeWriteLobRequest(
cb(err);
}

function onclose() {
// close events indicate no other events will be emitted
cleanup();
cb(createDestroyedStreamError());
}

function finalize() {
/* jshint bitwise:false */
// update lob options in header
Expand Down Expand Up @@ -375,6 +393,8 @@ Writer.prototype.finalizeWriteLobRequest = function finalizeWriteLobRequest(
// no more lobs to write or not enough bytes remaining for next lob
if (!self._lobs.length || bytesRemaining <= WRITE_LOB_REQUEST_HEADER_LENGTH) {
return cb(null);
} else if (self._lobs[0].destroyed) {
return cb(createDestroyedStreamError());
}
// set reabable stream
stream = self._lobs[0];
Expand All @@ -397,6 +417,7 @@ Writer.prototype.finalizeWriteLobRequest = function finalizeWriteLobRequest(
stream.on('error', onerror);
stream.on('end', onend);
stream.on('readable', onreadable);
stream.on('close', onclose);
onreadable.call(stream);
}

Expand Down Expand Up @@ -1117,6 +1138,10 @@ function createReadStreamError() {
return new Error('Chunk length larger than remaining bytes');
}

function createDestroyedStreamError() {
return new Error('Stream was destroyed before data could be completely consumed');
}

function createInvalidLengthError(type) {
return new Error(util.format('Invalid length or indicator value for %s type', type));
}
Expand Down Expand Up @@ -1169,7 +1194,9 @@ util.inherits(LobTransform, Transform);

// Wraps a Readable stream with a stream that is not in object mode
function LobTransform(source, events, options) {
var self = this;
this._source = source;
this._sourceEnded = false;
Transform.call(this, options);
// Forward all events indicated to the LobTransform wrapper
this._proxiedEvents = [];
Expand All @@ -1178,6 +1205,15 @@ function LobTransform(source, events, options) {
source.on(event, listener);
this._proxiedEvents.push({eventName: event, listener: listener});
}
// Destroy this transform if the source is closed before it ends
source.once('close', function () {
if (!self._sourceEnded) {
self.destroy();
}
});
source.once('end', function () {
self._sourceEnded = true;
});
source.pipe(this);
}

Expand All @@ -1192,4 +1228,5 @@ LobTransform.prototype._destroy = function _destroy() {
self._source.removeListener(value.eventName, value.listener);
});
this._source.unpipe(this);
this.emit('close');
}
107 changes: 103 additions & 4 deletions test/acceptance/db.Lob.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ var async = require('async');
var stream = require('stream');
var db = require('../db')();
var RemoteDB = require('../db/RemoteDB');
var common = require('../../lib/protocol/common');
var DEFAULT_PACKET_SIZE = common.DEFAULT_PACKET_SIZE;

var describeRemoteDB = db instanceof RemoteDB ? describe : describe.skip;
var isRemoteDB = db instanceof RemoteDB;
Expand Down Expand Up @@ -52,12 +54,12 @@ describe('db', function () {
var client = db.client;
var transaction = client._connection._transaction;

var dirname = path.join(__dirname, '..', 'fixtures', 'img');

describe('IMAGES', function () {
before(db.createImages.bind(db));
after(db.dropImages.bind(db));

var dirname = path.join(__dirname, '..', 'fixtures', 'img');

it('should return all images via callback', function (done) {
var sql = 'select * from images order by NAME';
client.exec(sql, function (err, rows) {
Expand Down Expand Up @@ -237,8 +239,6 @@ describe('db', function () {
}
});

var dirname = path.join(__dirname, '..', 'fixtures', 'img');

function testInsertReadableStream(inputStream, expected, done) {
var statement;
function prepareInsert(cb) {
Expand Down Expand Up @@ -308,7 +308,73 @@ describe('db', function () {
srcStream.pipe(transformStream);
testInsertReadableStream(transformStream, expected, done);
});
});

describeRemoteDB('Quiet close stream', function () {
var preparedStatement;
before(function (done) {
if (isRemoteDB) {
db.createTable.bind(db)('STREAM_BLOB_TABLE', ['A BLOB'], null, function (err) {
if (err) done(err);
client.prepare('INSERT INTO STREAM_BLOB_TABLE VALUES (?)', function (err, stmt) {
if (err) done(err);
preparedStatement = stmt;
done();
});
});
} else {
this.skip();
done();
}
});
after(function (done) {
if (isRemoteDB) {
db.dropTable.bind(db)('STREAM_BLOB_TABLE', function () {
preparedStatement.drop(done);
});
} else {
done();
}
});

function testInsertClosingStream(streamOptions, destroyBefore, expectedErrMessage, done) {
var srcStream = fs.createReadStream(path.join(dirname, "lobby.jpg"));
var transformStream = new AbortTransform(streamOptions);
srcStream.pipe(transformStream);
if (destroyBefore) {
transformStream.destroy();
}
preparedStatement.exec([transformStream], function (err) {
err.should.be.an.instanceof(Error);
err.message.should.equal(expectedErrMessage);
done();
});
}

var quietCloseErrMessage = "Stream was destroyed before data could be completely consumed";

it('should raise a destroyed error when given a destroyed stream', function (done) {
testInsertClosingStream({maxBytes: 50000}, true, quietCloseErrMessage, done);
});

it('should raise a destroyed error during an initial write lob execute', function (done) {
testInsertClosingStream({maxBytes: 50000}, false, quietCloseErrMessage, done);
});

it('should raise a destroyed error when stream is destroyed in between packet sends', function (done) {
testInsertClosingStream({maxBytes: DEFAULT_PACKET_SIZE + 1}, false, quietCloseErrMessage, done);
});

it('should raise a destroyed error when stream is destroyed during write lob request', function (done) {
testInsertClosingStream({maxBytes: DEFAULT_PACKET_SIZE + 1, throttleFlow: true},
false, quietCloseErrMessage, done);
});

it('should raise custom errors provided by user stream', function (done) {
var streamError = new Error("Custom stream error");
testInsertClosingStream({maxBytes: 50000, customError: streamError}, false,
streamError.message, done);
});
});
});

Expand Down Expand Up @@ -339,3 +405,36 @@ StrictMemoryTransform.prototype._transform = function _transform(chunk, encoding
}
tryPush();
}

util.inherits(AbortTransform, stream.Transform);

// Stream that will destroy itself once the maximum number of bytes are transformed
// When throttleFlow is true, the stream will avoid pushing a new chunk before the
// internal buffer is read
function AbortTransform(options) {
this._maxBytes = options.maxBytes;
this._customError = options.customError;
this._throttleFlow = options.throttleFlow;
this._currentBytes = 0;
stream.Transform.call(this, options);
}

AbortTransform.prototype._transform = function _transform(chunk, encoding, cb) {
var self = this;
function tryPush() {
if (self.readableLength === 0 || !self._throttleFlow) {
if (chunk.length + self._currentBytes < self._maxBytes) {
self.push(chunk);
self._currentBytes += chunk.length;
} else {
self.push(chunk.slice(0, self._maxBytes - self._currentBytes));
self._currentBytes = self._maxBytes;
self.destroy(self._customError);
}
cb();
} else {
setImmediate(tryPush);
}
}
tryPush();
}
Loading