diff --git a/README.md b/README.md index 2ec3ef4..d4b7abe 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ API documentation is available [here](https://bitchan.github.io/bitmessage/docs/ - [x] Address - [x] UserAgent - [ ] Network transports - - [ ] TCP (Node.js only) + - [x] TCP (Node.js only) - [ ] WebSocket - [ ] WebRTC - [ ] Parse PyBitmessage configs diff --git a/lib/messages.js b/lib/messages.js index 7fffa82..a14d132 100644 --- a/lib/messages.js +++ b/lib/messages.js @@ -54,6 +54,8 @@ var randomNonce = bmcrypto.randomBytes(8); * @namespace * @static */ +// TODO(Kagami): User agent and stream numbers size limits per +// . var version = exports.version = { /** * Decode `version` message. @@ -127,6 +129,7 @@ var version = exports.version = { var time = opts.time || new Date(); var nonce = opts.nonce || randomNonce; assert(nonce.length === 8, "Bad nonce"); + var port = opts.port || 8444; var userAgent = opts.userAgent || UserAgent.SELF; var streamNumbers = opts.streamNumbers || [1]; // Start encoding. @@ -141,7 +144,7 @@ var version = exports.version = { var addrFrom = structs.net_addr.encode({ services: services, host: "127.0.0.1", - port: opts.port, + port: port, short: true, }); return Buffer.concat([ diff --git a/lib/net/base.js b/lib/net/base.js index 184abe1..0d03502 100644 --- a/lib/net/base.js +++ b/lib/net/base.js @@ -55,7 +55,9 @@ BaseTransport.prototype.listen = function() { /** * Send [message]{@link module:bitmessage/structs.message} over the * wire (client mode). - * @param {Buffer} msg - Encoded message + * @param {(Buffer|string)} msg - Encoded message or command string + * @param (?Buffer} payload - Message payload (used if the first + * argument is a string) * @abstract */ BaseTransport.prototype.send = function() { @@ -65,7 +67,9 @@ BaseTransport.prototype.send = function() { /** * Send [message]{@link module:bitmessage/structs.message} to all * connected clients (server mode). - * @param {Buffer} msg - Encoded message + * @param {(Buffer|string)} msg - Encoded message or command string + * @param (?Buffer} payload - Message payload (used if the first + * argument is a string) * @abstract */ BaseTransport.prototype.broadcast = function() { diff --git a/lib/net/tcp.js b/lib/net/tcp.js index 3a51030..4306d5d 100644 --- a/lib/net/tcp.js +++ b/lib/net/tcp.js @@ -6,11 +6,14 @@ "use strict"; +var objectAssign = Object.assign || require("object-assign"); var inherits = require("inherits"); var net = require("net"); var dns = require("dns"); var assert = require("../_util").assert; var PPromise = require("../platform").Promise; +var structs = require("../structs"); +var messages = require("../messages"); var BaseTransport = require("./base"); /** @@ -20,30 +23,122 @@ var BaseTransport = require("./base"); */ function Transport(opts) { Transport.super_.call(this); - opts = opts || {}; - this.seeds = opts.seeds || []; - this.dnsSeeds = opts.dnsSeeds || []; - if (opts.client) { - this._setupClient(opts.client); - } - // To track connected clients in server mode. + objectAssign(this, opts); + this.seeds = this.seeds || []; + this.dnsSeeds = this.dnsSeeds || []; this._clients = {}; + if (this._client) { + this._setupClient(); + } } inherits(Transport, BaseTransport); -Transport.prototype._setupClient = function(client) { +// Unmap IPv4-mapped IPv6 address. +function unmap(addr) { + if (addr.slice(0, 7) === "::ffff:") { + return addr.slice(7); + } else { + return addr; + } +} + +Transport.prototype.sendVersion = function() { + return this.send(messages.version.encode({ + services: this.services, + userAgent: this.userAgent, + streamNumbers: this.streamNumbers, + port: this.port, + remoteHost: this._client.remoteAddress, + remotePort: this._client.remotePort, + })); +}; + +Transport.prototype._setupClient = function() { var self = this; - self._client = client; + var client = self._client; + var cache = Buffer(0); + var decoded; + var verackSent = false; + var verackReceived = false; + var established = false; // Set default transport timeout per spec. - client.setTimeout(20); + // TODO(Kagami): We may also want to close connection if it wasn't + // established within minute. + client.setTimeout(20000); client.on("connect", function() { self.emit("open"); + // NOTE(Kagami): This handler shouldn't be called at all for + // accepted sockets but let's be sure. + if (!self._accepted) { + self.sendVersion(); + } }); - client.on("data", function() { + client.on("data", function(data) { + // TODO(Kagami): We may want to preallocate 1.6M buffer for each + // client instead (max size of the message) to not constantly + // allocate new buffers. Though this may lead to another issues: too + // many memory per client. + cache = Buffer.concat([cache, data]); + while (true) { + decoded = structs.message.tryDecode(cache); + if (!decoded) { + break; + } + cache = decoded.rest; + if (decoded.message) { + self.emit( + "message", + decoded.message.command, + decoded.message.payload, + decoded.message); + } else if (decoded.error) { + // TODO(Kagami): Wrap it in custom error class? + // TODO(Kagami): Send `error` message and ban node for some time + // if there were too many errors? + self.emit("warning", new Error( + "Message decoding error from " + + unmap(client.remoteAddress) + ":" + client.remotePort, + ": " + + decoded.error + )); + } + } + }); + + // High-level message processing. + self.on("message", function(command) { + if (!established) { + // TODO: Process version data. + if (command === "version") { + if (verackSent) { + return; + } + self.send("verack"); + verackSent = true; + if (self._accepted) { + self.sendVersion(); + } + if (verackReceived) { + self.emit("established"); + } + } else if (command === "verack") { + verackReceived = true; + if (verackSent) { + self.emit("established"); + } + } + } + }); + + self.on("established", function() { + established = true; + // Raise timeout up to 10 minutes per spec. + // TODO(Kagami): Send pong messages every 5 minutes as PyBitmessage. + client.setTimeout(600000); }); client.on("timeout", function() { @@ -57,6 +152,9 @@ Transport.prototype._setupClient = function(client) { client.on("close", function() { self.emit("close"); delete self._client; + verackSent = false; + verackReceived = false; + established = false; }); }; @@ -95,10 +193,10 @@ Transport.prototype.bootstrap = function() { // FIXME(Kagami): Filter incorrect/private IP range nodes? // See also: . return PPromise.all(promises).then(function(dnsNodes) { + // Flatten array of arrays. + dnsNodes = Array.prototype.concat.apply([], dnsNodes); // Add hardcoded nodes to the end of list because DNS nodes should // be more up-to-date. - // Flatten array of array of arrays. - dnsNodes = Array.prototype.concat.apply([], dnsNodes); return dnsNodes.concat(hardcodedNodes); }); }; @@ -107,19 +205,10 @@ Transport.prototype.connect = function() { assert(!this._client, "Already connected"); assert(!this._server, "Already listening"); - var client = net.connect.apply(null, arguments); - this._setupClient(client); + this._client = net.connect.apply(null, arguments); + this._setupClient(); }; -// Unmap IPv4-mapped IPv6 addresses. -function unmap(addr) { - if (addr.indexOf("::ffff:") === 0) { - return addr.slice(7); - } else { - return addr; - } -} - Transport.prototype.listen = function() { assert(!this._client, "Already connected"); assert(!this._server, "Already listening"); @@ -128,27 +217,32 @@ Transport.prototype.listen = function() { var server = self._server = net.createServer(); server.listen.apply(server, arguments); - server.on("connection", function(sock) { - var addr = sock.remoteAddress; - var port = sock.remotePort; + // TODO(Kagami): We may want to specify some limits for number of + // connected users. + server.on("connection", function(client) { + var addr = client.remoteAddress; + var port = client.remotePort; if (self._clients[addr]) { // NOTE(Kagami): Doesn't allow more than one connection per IP. // This may obstruct people behind NAT but we copy PyBitmessage's // behavior here. - sock.end(); - self.emit("warning", addr + " was tried to connect once more"); + client.end(); + self.emit("warning", new Error( + addr + " was tried to create second connection" + )); return; } - self._clients[addr] = sock; - sock.on("close", function() { + self._clients[addr] = client; + client.on("close", function() { delete self._clients[addr]; }); - var transport = new self.constructor({ - client: sock, - seeds: self.seeds, - dnsSeeds: self.dnsSeeds, - }); + var transport = new self.constructor(objectAssign({}, self, { + _client: client, + _accepted: true, + })); self.emit("connection", transport, unmap(addr), port); + // Emit "open" manually because "connect" won't be emitted. + transport.emit("open"); }); server.on("error", function(err) { @@ -161,15 +255,24 @@ Transport.prototype.listen = function() { }); }; -Transport.prototype.send = function(data) { +function getmsg(args) { + if (typeof args[0] === "string") { + return structs.message.encode(args[0], args[1]); + } else { + return args[0]; + } +} + +Transport.prototype.send = function() { if (this._client) { - this._client.write(data); + this._client.write(getmsg(arguments)); } else { throw new Error("Not connected"); } }; -Transport.prototype.broadcast = function(data) { +Transport.prototype.broadcast = function() { + var data = getmsg(arguments); if (this._server) { Object.keys(this._clients).forEach(function(ip) { this._clients[ip].write(data); diff --git a/lib/structs.js b/lib/structs.js index 7e10a4c..8462a1e 100644 --- a/lib/structs.js +++ b/lib/structs.js @@ -215,9 +215,7 @@ var message = exports.message = { encode: function(command, payload) { assert(command.length <= 12, "Command is too long"); assert(isAscii(command), "Non-ASCII characters in command"); - if (!payload) { - payload = new Buffer(0); - } + payload = payload || new Buffer(0); assert(payload.length <= 1600003, "Message payload is too big"); var buf = new Buffer(24 + payload.length); buf.fill(0); diff --git a/tests/functional.js b/tests/functional.js index a21abea..06c8448 100644 --- a/tests/functional.js +++ b/tests/functional.js @@ -53,14 +53,31 @@ if (!process.browser) { it("should allow to interconnect two nodes", function(done) { tcp.connect(22333, "127.0.0.1"); - tcp.on("open", function() { + tcp.once("open", function() { done(); }); }); + it("should establish connection", function(done) { + tcp.once("established", function() { + done(); + }); + }); + + it("should allow to communicate", function(done) { + tcp.on("message", function cb(command, payload) { + if (command === "echo-res") { + expect(payload.toString()).to.equal("test"); + tcp.removeListener("message", cb); + done(); + } + }); + tcp.send("echo-req", Buffer("test")); + }); + it("should allow to close connection", function(done) { tcp.close(); - tcp.on("close", function() { + tcp.once("close", function() { done(); }); }); diff --git a/tests/tcp-node.js b/tests/tcp-node.js index fc2b871..b69ea15 100644 --- a/tests/tcp-node.js +++ b/tests/tcp-node.js @@ -1,8 +1,15 @@ var TcpTransport = require("../lib/net/tcp"); function start() { - var tcp = new TcpTransport(); - tcp.listen(22333, "127.0.0.1"); + var server = new TcpTransport(); + server.listen(22333, "127.0.0.1"); + server.on("connection", function(client) { + client.on("message", function(command, payload) { + if (command === "echo-req") { + client.send("echo-res", payload); + } + }); + }); } start();