Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/protocol/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ Connection.prototype.enqueue = function enqueue(task, cb) {
if (task instanceof request.Segment) {
queueable = this._queue.createTask(this.send.bind(this, task), cb);
queueable.name = MessageTypeName[task.type];
queueable.msgType = task.type;
} else if (util.isFunction(task.run)) {
queueable = task;
}
Expand Down Expand Up @@ -732,6 +733,10 @@ Connection.prototype.isIdle = function isIdle() {
return this._queue.empty && !this._queue.busy;
};

Connection.prototype.blockQueue = function blockQueue(blockingTask) {
this._queue.block(blockingTask);
}

Connection.prototype.setAutoCommit = function setAutoCommit(autoCommit) {
this._transaction.autoCommit = autoCommit;
};
Expand Down
139 changes: 83 additions & 56 deletions lib/protocol/ExecuteTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ function ExecuteTask(connection, options, callback) {
}
this.callback = callback;
this.reply = undefined;
this.finishedError = null;
this.finishedParameters = undefined;
this.isExecuteParams = true;
}

ExecuteTask.create = function createExecuteTask(connection, options, cb) {
Expand All @@ -59,60 +62,79 @@ ExecuteTask.prototype.run = function run(next) {
}
if (err) {
return self.sendRollback(function () {
// ignore roolback error
// ignore rollback error
done(err);
});
}
self.sendCommit(done);
}

function execute() {
function getExecuteRequest() {
if (!self.parameterValues.length && !self.writer.hasParameters) {
return finalize();
}
self.sendExecute(function receive(err, reply) {
if (err) {
return finalize(err);
}
if (!self.writer.finished && reply.rowsAffected == -1) {
reply.rowsAffected = undefined;
}
self.pushReply(reply);
if (!self.writer.finished && reply.writeLobReply) {
self.writer.update(reply.writeLobReply);
}
writeLob();
self.finishedParameters = undefined;
var availableSize = self.connection.getAvailableSize(false) - STATEMENT_ID_PART_LENGTH;
var availableSizeForLOBs = self.connection.getAvailableSize(true) - STATEMENT_ID_PART_LENGTH;

// Block the queue to this task and read lob requests
self.connection.blockQueue(self);

self.getParameters(availableSize, availableSizeForLOBs, function send(err, parameters) {
// Enqueue itself to wait for when the task becomes the one actively running in the queue
// and the connection is avaliable to send the packet
self.finishedError = err;
self.finishedParameters = parameters;
self.isExecuteParams = true;
self.connection.enqueue(self);
});

// Yield to only read lob tasks in the queue, the callback will enqueue this task
// again once the parameters are ready
next();
}

function writeLob() {
function getWriteLobRequest() {
if (self.writer.finished || self.writer.hasParameters) {
return execute();
return getExecuteRequest();
}
self.sendWriteLobRequest(function receive(err, reply) {
/* jshint unused:false */
if (err) {
return finalize(err);
}
self.pushReply(reply);
writeLob();
self.finishedParameters = undefined;
var availableSize = self.connection.getAvailableSize(true);
self.connection.blockQueue(self);
self.writer.getWriteLobRequest(availableSize, function (err, buffer) {
self.finishedError = err;
self.finishedParameters = buffer;
self.isExecuteParams = false;
self.connection.enqueue(self);
});

next();
}

// validate function code
if (self.parameterValues.length > 1) {
switch (self.functionCode) {
case FunctionCode.DDL:
case FunctionCode.INSERT:
case FunctionCode.UPDATE:
case FunctionCode.DELETE:
break;
default:
return done(createInvalidFunctionCodeError());
if (this.finishedError) {
finalize(this.finishedError);
} else if (this.finishedParameters) {
if (this.isExecuteParams) {
self.sendExecute(this.finishedParameters, finalize, getWriteLobRequest);
} else {
self.sendWriteLobRequest(this.finishedParameters, finalize, getWriteLobRequest);
}
} else { // No stored error or parameters, so get initial execute data
// validate function code
if (self.parameterValues.length > 1) {
switch (self.functionCode) {
case FunctionCode.DDL:
case FunctionCode.INSERT:
case FunctionCode.UPDATE:
case FunctionCode.DELETE:
break;
default:
return done(createInvalidFunctionCodeError());
}
}
}

execute();
getExecuteRequest();
}
};

ExecuteTask.prototype.end = function end(err) {
Expand Down Expand Up @@ -195,35 +217,40 @@ ExecuteTask.prototype.getParameters = function getParameters(availableSize, avai
next();
};

ExecuteTask.prototype.sendExecute = function sendExecute(cb) {
ExecuteTask.prototype.sendExecute = function sendExecute(parameters, finalize, cb) {
var self = this;
var availableSize = self.connection.getAvailableSize(false) - STATEMENT_ID_PART_LENGTH;
var availableSizeForLOBs = self.connection.getAvailableSize(true) - STATEMENT_ID_PART_LENGTH;
self.getParameters(availableSize, availableSizeForLOBs, function send(err, parameters) {
self.connection.send(request.execute({
autoCommit: self.autoCommit,
holdCursorsOverCommit: self.holdCursorsOverCommit,
scrollableCursor: self.scrollableCursor,
statementId: self.statementId,
parameters: parameters,
useCesu8: self.connection.useCesu8
}), function (err, reply) {
if (err) {
return cb(err);
return finalize(err);
}
self.connection.send(request.execute({
autoCommit: self.autoCommit,
holdCursorsOverCommit: self.holdCursorsOverCommit,
scrollableCursor: self.scrollableCursor,
statementId: self.statementId,
parameters: parameters,
useCesu8: self.connection.useCesu8
}), cb);
if (!self.writer.finished && reply.rowsAffected == -1) {
reply.rowsAffected = undefined;
}
self.pushReply(reply);
if (!self.writer.finished && reply.writeLobReply) {
self.writer.update(reply.writeLobReply);
}
cb();
});
};
}

ExecuteTask.prototype.sendWriteLobRequest = function sendWriteLobRequest(cb) {
ExecuteTask.prototype.sendWriteLobRequest = function sendWriteLobRequest(buffer, finalize, cb) {
var self = this;
var availableSize = self.connection.getAvailableSize(true);
self.writer.getWriteLobRequest(availableSize, function send(err, buffer) {
self.connection.send(request.writeLob({
writeLobRequest: buffer
}), function (err, reply) {
if (err) {
return cb(err);
return finalize(err);
}
self.connection.send(request.writeLob({
writeLobRequest: buffer
}), cb);
self.pushReply(reply);
cb();
});
};

Expand Down
100 changes: 94 additions & 6 deletions lib/util/Queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

var util = require('util');
var EventEmitter = require('events').EventEmitter;
var MessageType = require('../protocol/common/MessageType');

module.exports = Queue;

Expand All @@ -26,6 +27,12 @@ function Queue(immediate) {
this.queue = [];
this.busy = false;
this.running = !!immediate;
// Records read lob tasks which can be called out of position when
// the queue is blocked. If other tasks need to be called out of position
// this can be changed to a Map with the message type as keys.
this.readLobQueue = [];
this.blocked = false;
this.blockingTask = undefined;
}

Object.defineProperty(Queue.prototype, 'empty', {
Expand All @@ -36,14 +43,25 @@ Object.defineProperty(Queue.prototype, 'empty', {

Queue.prototype.unshift = function unshift(task) {
this.queue.unshift(task);
if (this.running) {
if (task.msgType === MessageType.READ_LOB) {
this.readLobQueue.unshift(task);
}
if (this.blocked && this._isBlockingTask(task)) {
this.emit('unblock', task);
} else if (this.running) {
this.dequeue();
}
return this;
};

Queue.prototype.push = function push(task) {
if (this.blocked && this._isBlockingTask(task)) {
return this.unshift(task);
}
this.queue.push(task);
if (task.msgType === MessageType.READ_LOB) {
this.readLobQueue.push(task);
}
if (this.running) {
this.dequeue();
}
Expand Down Expand Up @@ -72,14 +90,28 @@ Queue.prototype.abort = function abort(err) {
return this;
};

Queue.prototype.createTask = function createTask(send, receive, name) {
return new Task(send, receive, name);
Queue.prototype.createTask = function createTask(send, receive, name, msgType) {
return new Task(send, receive, name, msgType);
};

Queue.prototype.block = function block(blockingTask) {
this.blocked = true;
this.blockingTask = blockingTask;
}

Queue.prototype.unblock = function unblock() {
this.blocked = false;
this.blockingTask = undefined;
}

Queue.prototype._isBlockingTask = function _isBlockingTask(task) {
return task === this.blockingTask || task.msgType === MessageType.READ_LOB;
}

Queue.prototype.dequeue = function dequeue() {
var self = this;

function next(err, name) {
function runNext() {
/* jshint unused:false */
self.busy = false;
if (self.queue.length) {
Expand All @@ -89,21 +121,77 @@ Queue.prototype.dequeue = function dequeue() {
}
}

function runReadLob() {
if (self.readLobQueue.length) {
self.busy = false;
if (self.running && !self.busy) {
self.busy = true;
var task = self.readLobQueue.shift();
// Mark the task as ran so it will be skipped in the queue
task.ran = true;
// Optimization: When blocked, often read lobs are the most recently
// added at the beginning or end of the queue so they can be removed from there
// Note that the queue is not empty since it always has at least as many elements
// as the readLobQueue
if (self.queue[0] === task) {
self.queue.shift();
} else if (self.queue[self.queue.length - 1] === task) {
self.queue.pop();
}
task.run(next);
}
} else {
runNext();
}
}

function next(err, name) {
if (self.blocked) {
// Check if there exists a task that can be run
if (self.queue.length && self.blockingTask === self.queue[0]) {
self.unblock();
runNext();
} else if (self.readLobQueue.length) {
runReadLob();
} else {
self.once('unblock', function runTask (task) {
if (task === self.blockingTask) {
self.unblock();
runNext();
} else {
runReadLob();
}
});
}
} else {
runNext();
}
}

function run() {
if (self.running && !self.busy) {
// Queue is running and not busy
self.busy = true;
var task = self.queue.shift();
task.run(next);
if (task.ran) {
next(null, task.name);
} else {
if (task.msgType === MessageType.READ_LOB) {
self.readLobQueue.shift();
}
task.run(next);
}
}
}
run();
};

function Task(send, receive, name) {
function Task(send, receive, name, msgType) {
this.send = send;
this.receive = receive;
this.name = name;
this.msgType = msgType;
this.ran = false;
}

Task.prototype.run = function run(next) {
Expand Down
Loading