bitmessage-js/lib/net/tcp.js

300 lines
8.6 KiB
JavaScript
Raw Normal View History

2015-02-01 21:34:47 +01:00
/**
2015-02-10 13:57:55 +01:00
* TCP transport compatible with PyBitmessage. Available only for Node
* platform.
* **NOTE**: `TcpTransport` is exported as a module.
* @example var TcpTransport = require("bitmessage/net/tcp");
2015-02-01 21:34:47 +01:00
* @module bitmessage/net/tcp
*/
"use strict";
2015-02-09 15:38:10 +01:00
var objectAssign = Object.assign || require("object-assign");
2015-02-01 21:34:47 +01:00
var inherits = require("inherits");
2015-02-05 19:38:36 +01:00
var net = require("net");
2015-02-05 20:55:36 +01:00
var dns = require("dns");
2015-02-05 19:38:36 +01:00
var assert = require("../_util").assert;
var PPromise = require("../platform").Promise;
2015-02-09 15:38:10 +01:00
var structs = require("../structs");
var messages = require("../messages");
2015-02-06 10:47:40 +01:00
var BaseTransport = require("./base");
2015-02-01 21:34:47 +01:00
2015-02-09 22:56:55 +01:00
var getmsg = BaseTransport._getmsg;
var unmap = BaseTransport._unmap;
2015-02-01 21:34:47 +01:00
/**
2015-02-10 13:57:55 +01:00
* TCP transport class. Implements
* [base transport interface]{@link module:bitmessage/net/base.BaseTransport}.
2015-02-10 19:23:20 +01:00
* @param {Object=} opts - Transport options
2015-02-10 15:59:49 +01:00
* @param {Array} opts.seeds - Bootstrap nodes (none by default)
* @param {Array} opts.dnsSeeds - Bootstrap DNS nodes (none by default)
* @param {Object} opts.services -
* [Service features]{@link module:bitmessage/structs.ServicesBitfield}
* provided by this node (`NODE_NETWORK` by default)
* @param {(Array|string|Buffer)} opts.userAgent -
* [User agent]{@link module:bitmessage/user-agent} of this node
* (bitmessage's by default)
* @param {number[]} opts.streamNumbers - Streams accepted by this node
* (1 by default)
* @param {number} opts.port - Incoming port of this node (8444 by
* default)
2015-02-01 21:34:47 +01:00
* @constructor
* @static
*/
2015-02-10 13:57:55 +01:00
function TcpTransport(opts) {
TcpTransport.super_.call(this);
2015-02-09 15:38:10 +01:00
objectAssign(this, opts);
this.seeds = this.seeds || [];
this.dnsSeeds = this.dnsSeeds || [];
2015-02-05 19:38:36 +01:00
this._clients = {};
2015-02-01 21:34:47 +01:00
}
2015-02-10 13:57:55 +01:00
inherits(TcpTransport, BaseTransport);
2015-02-01 21:34:47 +01:00
2015-02-09 22:56:55 +01:00
function getfrom(client) {
return unmap(client.remoteAddress) + ":" + client.remotePort;
2015-02-09 15:38:10 +01:00
}
2015-02-10 13:57:55 +01:00
TcpTransport.prototype._sendVersion = function() {
2015-02-09 15:38:10 +01:00
return this.send(messages.version.encode({
services: this.services,
userAgent: this.userAgent,
streamNumbers: this.streamNumbers,
port: this.port,
remoteHost: this._client.remoteAddress,
remotePort: this._client.remotePort,
}));
};
2015-02-10 13:57:55 +01:00
TcpTransport.prototype._setupClient = function(client, accepted) {
2015-02-05 19:38:36 +01:00
var self = this;
2015-02-09 22:56:55 +01:00
self._client = client;
2015-02-09 15:38:10 +01:00
var cache = Buffer(0);
var decoded;
var verackSent = false;
var verackReceived = false;
var established = false;
2015-02-05 19:38:36 +01:00
// Set default transport timeout per spec.
2015-02-09 15:38:10 +01:00
// TODO(Kagami): We may also want to close connection if it wasn't
// established within minute.
client.setTimeout(20000);
2015-02-05 19:38:36 +01:00
client.on("connect", function() {
2015-02-09 15:38:10 +01:00
// NOTE(Kagami): This handler shouldn't be called at all for
// accepted sockets but let's be sure.
2015-02-09 22:56:55 +01:00
if (!accepted) {
self.emit("open");
2015-02-10 13:57:55 +01:00
self._sendVersion();
2015-02-09 15:38:10 +01:00
}
});
client.on("data", function(data) {
// TODO(Kagami): We may want to preallocate 1.6M buffer for each
// client instead (max size of the message) to not constantly
// allocate new buffers. Though this may lead to another issues: too
// many memory per client.
cache = Buffer.concat([cache, data]);
while (true) {
decoded = structs.message.tryDecode(cache);
if (!decoded) {
break;
}
cache = decoded.rest;
if (decoded.message) {
self.emit(
"message",
decoded.message.command,
decoded.message.payload,
decoded.message);
} else if (decoded.error) {
// TODO(Kagami): Wrap it in custom error class?
// TODO(Kagami): Send `error` message and ban node for some time
// if there were too many errors?
self.emit("warning", new Error(
2015-02-09 22:56:55 +01:00
"Message decoding error from " + getfrom(client) + ": " +
2015-02-09 15:38:10 +01:00
decoded.error
));
}
}
});
// High-level message processing.
self.on("message", function(command) {
if (!established) {
// TODO: Process version data.
if (command === "version") {
if (verackSent) {
return;
}
self.send("verack");
verackSent = true;
2015-02-09 22:56:55 +01:00
if (accepted) {
2015-02-10 13:57:55 +01:00
self._sendVersion();
2015-02-09 22:56:55 +01:00
} else if (verackReceived) {
2015-02-09 15:38:10 +01:00
self.emit("established");
}
} else if (command === "verack") {
verackReceived = true;
if (verackSent) {
self.emit("established");
}
}
}
2015-02-05 19:38:36 +01:00
});
2015-02-09 15:38:10 +01:00
self.on("established", function() {
established = true;
// Raise timeout up to 10 minutes per spec.
// TODO(Kagami): Send pong messages every 5 minutes as PyBitmessage.
client.setTimeout(600000);
2015-02-05 19:38:36 +01:00
});
client.on("timeout", function() {
client.end();
});
client.on("error", function(err) {
self.emit("error", err);
});
client.on("close", function() {
self.emit("close");
delete self._client;
});
};
2015-02-05 20:55:36 +01:00
function resolveDnsSeed(seed) {
var host = seed[0];
var port = seed[1];
var nodes = [];
// NOTE(Kagami):
// 1) Node's `getaddrinfo` (`dns.lookup`) returns only one address so
// we can't use it.
// 2) Node's `dig host any` (`dns.resolve`) doesn't return type of the
// record! So we resolve twice for A and AAAA.
// 3) We ignore any errors here, promise's result is always a list.
return new PPromise(function(resolve) {
dns.resolve4(host, function(err, nodes4) {
if (!err) {
nodes4.forEach(function(n) {
nodes.push([n, port]);
});
}
dns.resolve6(host, function(err, nodes6) {
if (!err) {
nodes6.forEach(function(n) {
nodes.push([n, port]);
});
}
resolve(nodes);
});
});
});
}
2015-02-10 13:57:55 +01:00
TcpTransport.prototype.bootstrap = function() {
2015-02-05 20:55:36 +01:00
var promises = this.dnsSeeds.map(resolveDnsSeed);
var hardcodedNodes = this.seeds;
// FIXME(Kagami): Filter incorrect/private IP range nodes?
// See also: <https://github.com/Bitmessage/PyBitmessage/issues/768>.
return PPromise.all(promises).then(function(dnsNodes) {
2015-02-09 15:38:10 +01:00
// Flatten array of arrays.
dnsNodes = Array.prototype.concat.apply([], dnsNodes);
2015-02-05 20:55:36 +01:00
// Add hardcoded nodes to the end of list because DNS nodes should
// be more up-to-date.
return dnsNodes.concat(hardcodedNodes);
});
2015-02-05 19:38:36 +01:00
};
2015-02-10 13:57:55 +01:00
/**
* Connect to a TCP node. Connection arguments are the same as for
* [net.connect](http://nodejs.org/api/net.html#net_net_connect_port_host_connectlistener).
*/
TcpTransport.prototype.connect = function() {
2015-02-05 19:38:36 +01:00
assert(!this._client, "Already connected");
assert(!this._server, "Already listening");
2015-02-09 22:56:55 +01:00
this._setupClient(net.connect.apply(null, arguments));
2015-02-05 19:38:36 +01:00
};
2015-02-10 13:57:55 +01:00
/**
* Listen for incoming TCP connections. Listen arguments are the same as
* for
* [server.listen](http://nodejs.org/api/net.html#net_server_listen_port_host_backlog_callback).
*/
TcpTransport.prototype.listen = function() {
2015-02-05 19:38:36 +01:00
assert(!this._client, "Already connected");
assert(!this._server, "Already listening");
var self = this;
var server = self._server = net.createServer();
server.listen.apply(server, arguments);
2015-02-09 15:38:10 +01:00
// TODO(Kagami): We may want to specify some limits for number of
// connected users.
server.on("connection", function(client) {
var addr = client.remoteAddress;
var port = client.remotePort;
2015-02-06 14:04:17 +01:00
if (self._clients[addr]) {
// NOTE(Kagami): Doesn't allow more than one connection per IP.
// This may obstruct people behind NAT but we copy PyBitmessage's
// behavior here.
2015-02-09 15:38:10 +01:00
client.end();
2015-02-09 22:56:55 +01:00
return self.emit("warning", new Error(
unmap(addr) + " was tried to create second connection"
2015-02-09 15:38:10 +01:00
));
2015-02-06 14:04:17 +01:00
}
2015-02-09 15:38:10 +01:00
self._clients[addr] = client;
client.on("close", function() {
2015-02-06 14:04:17 +01:00
delete self._clients[addr];
2015-02-05 19:38:36 +01:00
});
2015-02-09 22:56:55 +01:00
var transport = new self.constructor(self);
var accepted = true;
transport._setupClient(client, accepted);
2015-02-06 14:04:17 +01:00
self.emit("connection", transport, unmap(addr), port);
2015-02-09 22:56:55 +01:00
// Emit "open" manually because "connect" for this socket won't be
// fired.
2015-02-09 15:38:10 +01:00
transport.emit("open");
2015-02-05 19:38:36 +01:00
});
server.on("error", function(err) {
self.emit("error", err);
});
server.on("close", function() {
self.emit("close");
delete self._server;
});
};
2015-02-10 13:57:55 +01:00
TcpTransport.prototype.send = function() {
2015-02-05 19:38:36 +01:00
if (this._client) {
2015-02-09 15:38:10 +01:00
this._client.write(getmsg(arguments));
2015-02-05 19:38:36 +01:00
} else {
throw new Error("Not connected");
}
};
2015-02-10 13:57:55 +01:00
TcpTransport.prototype.broadcast = function() {
2015-02-09 15:38:10 +01:00
var data = getmsg(arguments);
2015-02-05 19:38:36 +01:00
if (this._server) {
2015-02-06 14:04:17 +01:00
Object.keys(this._clients).forEach(function(ip) {
this._clients[ip].write(data);
2015-02-05 19:38:36 +01:00
}, this);
} else {
throw new Error("Not listening");
}
};
2015-02-10 13:57:55 +01:00
TcpTransport.prototype.close = function() {
2015-02-05 19:38:36 +01:00
if (this._client) {
this._client.end();
} else if (this._server) {
2015-02-06 14:04:17 +01:00
Object.keys(this._clients).forEach(function(ip) {
this._clients[ip].end();
2015-02-05 19:38:36 +01:00
}, this);
this._server.close();
}
};
2015-02-10 13:57:55 +01:00
module.exports = TcpTransport;