/** * TCP transport compatible with PyBitmessage. Available only for Node * platform. * **NOTE**: `TcpTransport` is exported as a module. * @module bitmessage/net/tcp * @example * var messages = require("bitmessage").messages; * var TcpTransport = require("bitmessage/lib/net/tcp"); * * var tcp = new TcpTransport({ * dnsSeeds: [["bootstrap8444.bitmessage.org", 8444]], * }); * * tcp.bootstrap().then(function(nodes) { * var remoteHost = nodes[0][0]; * var remotePort = nodes[0][1]; * console.log("Connecting to", nodes[0]); * tcp.connect(remotePort, remoteHost); * }); * * tcp.on("established", function(version) { * console.log("Connection established to", version.userAgent); * * tcp.on("message", function(command, payload) { * console.log("Got new", command, "message"); * var decoded; * if (command === "addr") { * decoded = messages.addr.decodePayload(payload); * console.log("Got", decoded.addrs.length, "node addresses"); * } * }); * }); */ "use strict"; var objectAssign = Object.assign || require("object-assign"); var inherits = require("inherits"); var net = require("net"); var dns = require("dns"); var util = require("../_util"); var PPromise = require("../platform").Promise; var structs = require("../structs"); var messages = require("../messages"); var BaseTransport = require("./base"); var assert = util.assert; var getmsg = BaseTransport._getmsg; var unmap = BaseTransport._unmap; /** * TCP transport class. Implements [base transport interface]{@link * module:bitmessage/net/base.BaseTransport}. * @param {Object=} opts - Transport options * @param {Array} opts.seeds - Bootstrap nodes (none by default) * @param {Array} opts.dnsSeeds - Bootstrap DNS nodes (none by default) * @param {Object} opts.services - * [Service features]{@link module:bitmessage/structs.ServicesBitfield} * provided by this node (`NODE_NETWORK` by default) * @param {(Array|string|Buffer)} opts.userAgent - * [User agent]{@link module:bitmessage/user-agent} of this node * (user agent of bitmessage library by default) * @param {number[]} opts.streams - Streams accepted by this node ([1] * by default) * @param {number} opts.port - Incoming port of this node (8444 by * default) * @constructor * @static */ function TcpTransport(opts) { TcpTransport.super_.call(this); objectAssign(this, opts); this.seeds = this.seeds || []; this.dnsSeeds = this.dnsSeeds || []; this.streams = this.streams || [1]; this._clients = {}; } inherits(TcpTransport, BaseTransport); TcpTransport.prototype._sendVersion = function() { return this.send(messages.version.encode({ services: this.services, userAgent: this.userAgent, streams: this.streams, port: this.port, remoteHost: this._client.remoteAddress, remotePort: this._client.remotePort, })); }; TcpTransport.prototype._setupClient = function(client, incoming) { var self = this; self._client = client; var cache = Buffer(0); var decoded; var verackSent = false; var verackReceived = false; var established = false; // Set default transport timeout per spec. // TODO(Kagami): We may also want to close connection if it wasn't // established within minute. client.setTimeout(20000); client.on("connect", function() { // NOTE(Kagami): This handler shouldn't be called at all for // incoming connections but let's be sure. if (!incoming) { self.emit("open"); self._sendVersion(); } }); client.on("data", function(data) { // TODO(Kagami): We may want to preallocate 1.6M buffer for each // client instead (max size of the message) to not constantly // allocate new buffers. Though this may lead to another issues: too // many memory per client. cache = Buffer.concat([cache, data]); while (true) { decoded = structs.message.tryDecode(cache); if (!decoded) { break; } cache = decoded.rest; if (decoded.message) { self.emit("message", decoded.message.command, decoded.message.payload); } else if (decoded.error) { // TODO(Kagami): Wrap it in custom error class? // TODO(Kagami): Send `error` message and ban node for some time // if there were too many errors? self.emit("warning", new Error( "Message decoding error: " + decoded.error.message )); } } }); // High-level message processing. self.on("message", function(command, payload) { var version; if (!established) { if (command === "version") { if (verackSent) { return; } try { version = self._decodeVersion(payload, {network: true}); } catch(err) { self.emit("error", err); return client.end(); } self.send("verack"); verackSent = true; if (incoming) { self._sendVersion(); } else if (verackReceived) { self.emit("established", version); } } else if (command === "verack") { verackReceived = true; if (verackSent) { self.emit("established", version); } } } }); self.on("established", function() { established = true; // Raise timeout up to 10 minutes per spec. // TODO(Kagami): Send pong messages every 5 minutes as PyBitmessage. client.setTimeout(600000); }); client.on("timeout", function() { client.end(); }); client.on("error", function(err) { self.emit("error", err); }); client.on("close", function() { self.emit("close"); delete self._client; }); }; function resolveDnsSeed(seed) { var host = seed[0]; var port = seed[1]; var nodes = []; // NOTE(Kagami): // 1) Node's `getaddrinfo` (`dns.lookup`) returns only one address so // we can't use it. // 2) Node's `dig host any` (`dns.resolve`) doesn't return type of the // record! So we resolve twice for A and AAAA. // 3) We ignore any errors here, promise's result is always a list. return new PPromise(function(resolve) { dns.resolve4(host, function(err, nodes4) { if (!err) { nodes4.forEach(function(n) { nodes.push([n, port]); }); } dns.resolve6(host, function(err, nodes6) { if (!err) { nodes6.forEach(function(n) { nodes.push([n, port]); }); } resolve(nodes); }); }); }); } TcpTransport.prototype.bootstrap = function() { var hardcodedNodes = this.seeds; // FIXME(Kagami): Filter incorrect/private IP range nodes? // See also: . return this.bootstrapDns().then(function(dnsNodes) { // Add hardcoded nodes to the end of list because DNS nodes should // be more up-to-date. return dnsNodes.concat(hardcodedNodes); }); }; /** * Do only DNS-specific bootstrap. * @return {Promise.} Discovered seed nodes. */ TcpTransport.prototype.bootstrapDns = function() { var promises = this.dnsSeeds.map(resolveDnsSeed); return PPromise.all(promises).then(function(dnsNodes) { // Flatten array of arrays. return Array.prototype.concat.apply([], dnsNodes); }); }; /** * Connect to a TCP node. Connection arguments are the same as for * [net.connect](http://nodejs.org/api/net.html#net_net_connect_port_host_connectlistener). */ TcpTransport.prototype.connect = function() { assert(!this._client, "Already connected"); assert(!this._server, "Already listening"); this._setupClient(net.connect.apply(null, arguments)); }; /** * Listen for incoming TCP connections. Listen arguments are the same as * for * [server.listen](http://nodejs.org/api/net.html#net_server_listen_port_host_backlog_callback). */ TcpTransport.prototype.listen = function() { assert(!this._client, "Already connected"); assert(!this._server, "Already listening"); var self = this; var server = self._server = net.createServer(); server.listen.apply(server, arguments); var clientIdCounter = 0; server.on("connection", function(client) { var id = client.id = clientIdCounter++; self._clients[id] = client; client.on("close", function() { delete self._clients[id]; }); var opts = objectAssign({}, self); delete opts._server; var transport = new self.constructor(opts); var incoming = true; transport._setupClient(client, incoming); var addr = client.remoteAddress; var port = client.remotePort; self.emit("connection", transport, unmap(addr), port); }); server.on("error", function(err) { self.emit("error", err); }); server.on("close", function() { self.emit("close"); delete self._server; }); }; TcpTransport.prototype.send = function() { if (this._client) { this._client.write(getmsg(arguments)); } else { throw new Error("Not connected"); } }; TcpTransport.prototype.broadcast = function() { var data = getmsg(arguments); if (this._server) { Object.keys(this._clients).forEach(function(id) { this._clients[id].write(data); }, this); } else { throw new Error("Not listening"); } }; TcpTransport.prototype.close = function() { if (this._client) { this._client.end(); } else if (this._server) { Object.keys(this._clients).forEach(function(id) { this._clients[id].end(); }, this); this._server.close(); } }; module.exports = TcpTransport;