Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Downloaded npm modules
node_modules/
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ Methods

* password - _string_ - Password for authentication. **Default:** 'anonymous@'

* passive - _boolean_ - Allow to choose between FTP passive and active modes. If false, 'activeIp' option should be set. **Default:** true

* activeIp - _string - The IP address FTP server will use during active connection (ie. this client IP address, reachable from FTP server). Required if 'passive' option is set to false. **Default:** '127.0.0.1'

* connTimeout - _integer_ - How long (in milliseconds) to wait for the control connection to be established. **Default:** 10000

* pasvTimeout - _integer_ - How long (in milliseconds) to wait for a PASV data connection to be established. **Default:** 10000
Expand Down Expand Up @@ -139,7 +143,7 @@ Methods
* group - _string_ - An empty string or any combination of 'r', 'w', 'x'.

* other - _string_ - An empty string or any combination of 'r', 'w', 'x'.

* owner - _string_ - The user name or ID that this entry belongs to **(*NIX only)**.

* group - _string_ - The group name or ID that this entry belongs to **(*NIX only)**.
Expand Down
199 changes: 150 additions & 49 deletions lib/connection.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
var fs = require('fs'),
tls = require('tls'),
zlib = require('zlib'),
Socket = require('net').Socket,
net = require('net'),
EventEmitter = require('events').EventEmitter,
inherits = require('util').inherits,
inspect = require('util').inspect,
Expand Down Expand Up @@ -66,13 +66,17 @@ var FTP = module.exports = function() {
this._keepalive = undefined;
this._ending = false;
this._parser = undefined;
this._actvPort = undefined;
this._actvSock = undefined;
this.options = {
host: undefined,
port: undefined,
user: undefined,
password: undefined,
secure: false,
secureOptions: undefined,
passive: true,
activeIp: undefined,
connTimeout: undefined,
pasvTimeout: undefined,
aliveTimeout: undefined
Expand All @@ -94,16 +98,23 @@ FTP.prototype.connect = function(options) {
: 'anonymous@';
this.options.secure = options.secure || false;
this.options.secureOptions = options.secureOptions;
this.options.passive = options.passive === undefined ? true : options.passive;
this.options.activeIp = options.activeIp;
this.options.connTimeout = options.connTimeout || 10000;
this.options.pasvTimeout = options.pasvTimeout || 10000;
this.options.aliveTimeout = options.keepalive || 10000;

if (typeof options.debug === 'function')
this._debug = options.debug;

if (!self.options.passive && !self.options.activeIp){
self.emit('error', new Error('Missing active IP option, fallback to 127.0.0.1'));
self.options.activeIp = '127.0.0.1';
}

var secureOptions,
debug = this._debug,
socket = new Socket();
socket = new net.Socket();

socket.setTimeout(0);
socket.setKeepAlive(true);
Expand Down Expand Up @@ -148,6 +159,49 @@ FTP.prototype.connect = function(options) {
this._socket = socket;
}

if(!this.options.passive){
var server = net.createServer(function(socket) {
debug&&debug('[connection] ACTV socket connected');
if(self._actvSock){
self.emit('error', new Error('Unhandled multiple active connections.'));
return;
}
self._actvSock = socket;

socket.once('end', function(){
self._actvSock = undefined;
});

socket.once('close', function(had_err) {
self._actvSock = undefined;
});

if(self._actvHandler){
self._actvHandler(socket);
self._actvHandler = undefined;
} else {
socket.on('data', function(chunk) {
debug&&debug('[active] < ' + inspect(chunk.toString('binary')));
if (self._parser)
self._parser.write(chunk);
});

socket.on('error', function(err) {
clearTimeout(self._keepalive);
self.emit('error', err);
});
}
});
server.on('error', function(err) {
self.emit('error', new Error('Listening server error.'));
self._reset();
});
server.listen(function() {
self._actvPort = server.address().port;
debug&&debug('[active] Listening on', self._actvPort);
});
}

var noopreq = {
cmd: 'NOOP',
cb: function() {
Expand Down Expand Up @@ -426,7 +480,7 @@ FTP.prototype.list = function(path, zcomp, cb) {
} else
cmd = 'LIST ' + path;

this._pasv(function(err, sock) {
this._dataconn(function(err, sock) {
if (err)
return cb(err);

Expand All @@ -435,24 +489,32 @@ FTP.prototype.list = function(path, zcomp, cb) {
return cb();
}

var sockerr, done = false, replies = 0, entries, buffer = '', source = sock;
var sockerr, done = false, replies = 0, entries, buffer = '';
var decoder = new StringDecoder('utf8');

if (zcomp) {
source = zlib.createInflate();
sock.pipe(source);
// passive connection already got actual socket
if(self.options.passive){
readFromSource(sock)
} else {
self._actvHandler = readFromSource;
}

source.on('data', function(chunk) {
buffer += decoder.write(chunk);
});
source.once('error', function(err) {
if (!sock.aborting)
sockerr = err;
});
source.once('end', ondone);
source.once('close', ondone);
function readFromSource(source){
if (zcomp) {
source = zlib.createInflate();
sock.pipe(source);
}

source.on('data', function(chunk) {
buffer += decoder.write(chunk);
});
source.once('error', function(err) {
if (!sock.aborting)
sockerr = err;
});
source.once('end', ondone);
source.once('close', ondone);
}
function ondone() {
if (decoder) {
buffer += decoder.end();
Expand Down Expand Up @@ -534,7 +596,7 @@ FTP.prototype.get = function(path, zcomp, cb) {
zcomp = false;
}

this._pasv(function(err, sock) {
this._dataconn(function(err, sock) {
if (err)
return cb(err);

Expand All @@ -546,39 +608,51 @@ FTP.prototype.get = function(path, zcomp, cb) {
// modify behavior of socket events so that we can emit 'error' once for
// either a TCP-level error OR an FTP-level error response that we get when
// the socket is closed (e.g. the server ran out of space).
var sockerr, started = false, lastreply = false, done = false,
source = sock;
var sockerr, started = false, lastreply = false, done = false, source;

if (zcomp) {
source = zlib.createInflate();
sock.pipe(source);
sock._emit = sock.emit;
sock.emit = function(ev, arg1) {
if(self.options.passive){
readFromSource(sock);
} else {
self._actvHandler = function(sock){
readFromSource(sock);
cb(undefined, source);
cb = undefined;
};
}

function readFromSource(sock){
source = sock;
if (zcomp) {
source = zlib.createInflate();
sock.pipe(source);
sock._emit = sock.emit;
sock.emit = function(ev, arg1) {
if (ev === 'error') {
if (!sockerr)
sockerr = arg1;
return;
}
sock._emit.apply(sock, Array.prototype.slice.call(arguments));
};
}

source._emit = source.emit;
source.emit = function(ev, arg1) {
if (ev === 'error') {
if (!sockerr)
sockerr = arg1;
return;
} else if (ev === 'end' || ev === 'close') {
if (!done) {
done = true;
ondone();
}
return;
}
sock._emit.apply(sock, Array.prototype.slice.call(arguments));
source._emit.apply(source, Array.prototype.slice.call(arguments));
};
}

source._emit = source.emit;
source.emit = function(ev, arg1) {
if (ev === 'error') {
if (!sockerr)
sockerr = arg1;
return;
} else if (ev === 'end' || ev === 'close') {
if (!done) {
done = true;
ondone();
}
return;
}
source._emit.apply(source, Array.prototype.slice.call(arguments));
};

function ondone() {
if (done && lastreply) {
self._send('MODE S', function() {
Expand Down Expand Up @@ -625,7 +699,7 @@ FTP.prototype.get = function(path, zcomp, cb) {
// just like a 150
if (code === 150 || code === 125) {
started = true;
cb(undefined, source);
cb&&cb(undefined, source);
} else {
lastreply = true;
ondone();
Expand Down Expand Up @@ -743,12 +817,12 @@ FTP.prototype.rmdir = function(path, recursive, cb) { // RMD is optional
if (!recursive) {
return this._send('RMD ' + path, cb);
}

var self = this;
this.list(path, function(err, list) {
if (err) return cb(err);
var idx = 0;

// this function will be called once per listing entry
var deleteNextEntry;
deleteNextEntry = function(err) {
Expand All @@ -760,9 +834,9 @@ FTP.prototype.rmdir = function(path, recursive, cb) { // RMD is optional
return self.rmdir(path, cb);
}
}

var entry = list[idx++];

// get the path to the file
var subpath = null;
if (entry.name[0] === '/') {
Expand All @@ -776,7 +850,7 @@ FTP.prototype.rmdir = function(path, recursive, cb) { // RMD is optional
subpath = path + '/' + entry.name
}
}

// delete the entry (recursively) according to its type
if (entry.type === 'd') {
if (entry.name === "." || entry.name === "..") {
Expand Down Expand Up @@ -853,6 +927,14 @@ FTP.prototype.restart = function(offset, cb) {


// Private/Internal methods
FTP.prototype._dataconn = function(cb){
if(this.options.passive) {
this._pasv(cb);
} else {
this._actv(cb);
}
};

FTP.prototype._pasv = function(cb) {
var self = this, first = true, ip, port;
this._send('PASV', function reentry(err, text) {
Expand Down Expand Up @@ -902,7 +984,7 @@ FTP.prototype._pasv = function(cb) {

FTP.prototype._pasvConnect = function(ip, port, cb) {
var self = this,
socket = new Socket(),
socket = new net.Socket(),
sockerr,
timedOut = false,
timer = setTimeout(function() {
Expand Down Expand Up @@ -950,6 +1032,19 @@ FTP.prototype._pasvConnect = function(ip, port, cb) {
socket.connect(port, ip);
};

FTP.prototype._actv = function(cb) {
var self = this,
ip = self.options.activeIp.replace(/\./g,','),
port = parseInt(self._actvPort / 256) + ',' + (self._actvPort % 256);

this._send('PORT ' + ip + ',' + port, function(err, text, code) {
if(err){
cb(new Error(err));
}
cb(undefined, self._socket);
});
}

FTP.prototype._store = function(cmd, input, zcomp, cb) {
var isBuffer = Buffer.isBuffer(input);

Expand All @@ -962,7 +1057,7 @@ FTP.prototype._store = function(cmd, input, zcomp, cb) {
}

var self = this;
this._pasv(function(err, sock) {
this._dataconn(function(err, sock) {
if (err)
return cb(err);

Expand Down Expand Up @@ -1006,6 +1101,9 @@ FTP.prototype._store = function(cmd, input, zcomp, cb) {
}

if (code === 150 || code === 125) {
if(!self.options.passive){
dest = self._actvSock;
}
if (isBuffer)
dest.end(input);
else if (typeof input === 'string') {
Expand Down Expand Up @@ -1053,10 +1151,13 @@ FTP.prototype._send = function(cmd, cb, promote) {
FTP.prototype._reset = function() {
if (this._pasvSock && this._pasvSock.writable)
this._pasvSock.end();
if (this._actvSock && this._actvSock.writable)
this._actvSock.end();
if (this._socket && this._socket.writable)
this._socket.end();
this._socket = undefined;
this._pasvSock = undefined;
this._actvSock = undefined;
this._feat = undefined;
this._curReq = undefined;
this._secstate = undefined;
Expand Down