From 084ddc8084f3b6ee115307740afaf5c4bc108dff Mon Sep 17 00:00:00 2001 From: Kagami Hiiragi Date: Tue, 10 Feb 2015 00:56:55 +0300 Subject: [PATCH] WebSocket transport --- README.md | 2 +- lib/messages.js | 3 +- lib/net/base.js | 20 ++++ lib/net/tcp.js | 62 ++++--------- lib/net/ws.browser.js | 107 ++++++++++++++++++--- lib/net/ws.js | 210 ++++++++++++++++++++++++++++++++++++++++-- tests/functional.js | 65 ++++++++++++- tests/tcp-node.js | 2 + tests/ws-node.js | 15 +++ 9 files changed, 415 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index d4b7abe..b9fba36 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ API documentation is available [here](https://bitchan.github.io/bitmessage/docs/ - [x] UserAgent - [ ] Network transports - [x] TCP (Node.js only) - - [ ] WebSocket + - [x] WebSocket - [ ] WebRTC - [ ] Parse PyBitmessage configs - [ ] keys.dat diff --git a/lib/messages.js b/lib/messages.js index a14d132..141b725 100644 --- a/lib/messages.js +++ b/lib/messages.js @@ -25,8 +25,7 @@ var ServicesBitfield = structs.ServicesBitfield; * Note that this function doesn't do any validation because it is * already provided by * [message.decode]{@link module:bitmessage/structs.message.decode} - * routine. Normally you call this for each incoming message and then - * call decode function of the appropriate message handler. + * routine. * @param {Buffer} buf - Buffer that starts with encoded message * @return {?string} Message's command if any. */ diff --git a/lib/net/base.js b/lib/net/base.js index 0d03502..b23a05c 100644 --- a/lib/net/base.js +++ b/lib/net/base.js @@ -10,6 +10,7 @@ var inherits = require("inherits"); var EventEmitter = require("events").EventEmitter; var PPromise = require("../platform").Promise; +var structs = require("../structs"); /** * Network transport base class. @@ -84,4 +85,23 @@ BaseTransport.prototype.close = function() { throw new Error("Not implemented"); }; +// Static helpers. + +BaseTransport._getmsg = function(args) { + if (typeof args[0] === "string") { + return structs.message.encode(args[0], args[1]); + } else { + return args[0]; + } +}; + +// Unmap IPv4-mapped IPv6 address. +BaseTransport._unmap = function(addr) { + if (addr.slice(0, 7) === "::ffff:") { + return addr.slice(7); + } else { + return addr; + } +}; + module.exports = BaseTransport; diff --git a/lib/net/tcp.js b/lib/net/tcp.js index 4306d5d..e9fc4eb 100644 --- a/lib/net/tcp.js +++ b/lib/net/tcp.js @@ -16,6 +16,9 @@ var structs = require("../structs"); var messages = require("../messages"); var BaseTransport = require("./base"); +var getmsg = BaseTransport._getmsg; +var unmap = BaseTransport._unmap; + /** * TCP transport constructor. * @constructor @@ -27,20 +30,12 @@ function Transport(opts) { this.seeds = this.seeds || []; this.dnsSeeds = this.dnsSeeds || []; this._clients = {}; - if (this._client) { - this._setupClient(); - } } inherits(Transport, BaseTransport); -// Unmap IPv4-mapped IPv6 address. -function unmap(addr) { - if (addr.slice(0, 7) === "::ffff:") { - return addr.slice(7); - } else { - return addr; - } +function getfrom(client) { + return unmap(client.remoteAddress) + ":" + client.remotePort; } Transport.prototype.sendVersion = function() { @@ -54,9 +49,9 @@ Transport.prototype.sendVersion = function() { })); }; -Transport.prototype._setupClient = function() { +Transport.prototype._setupClient = function(client, accepted) { var self = this; - var client = self._client; + self._client = client; var cache = Buffer(0); var decoded; var verackSent = false; @@ -69,10 +64,10 @@ Transport.prototype._setupClient = function() { client.setTimeout(20000); client.on("connect", function() { - self.emit("open"); // NOTE(Kagami): This handler shouldn't be called at all for // accepted sockets but let's be sure. - if (!self._accepted) { + if (!accepted) { + self.emit("open"); self.sendVersion(); } }); @@ -100,9 +95,7 @@ Transport.prototype._setupClient = function() { // TODO(Kagami): Send `error` message and ban node for some time // if there were too many errors? self.emit("warning", new Error( - "Message decoding error from " + - unmap(client.remoteAddress) + ":" + client.remotePort, - ": " + + "Message decoding error from " + getfrom(client) + ": " + decoded.error )); } @@ -119,10 +112,9 @@ Transport.prototype._setupClient = function() { } self.send("verack"); verackSent = true; - if (self._accepted) { + if (accepted) { self.sendVersion(); - } - if (verackReceived) { + } else if (verackReceived) { self.emit("established"); } } else if (command === "verack") { @@ -152,9 +144,6 @@ Transport.prototype._setupClient = function() { client.on("close", function() { self.emit("close"); delete self._client; - verackSent = false; - verackReceived = false; - established = false; }); }; @@ -204,9 +193,7 @@ Transport.prototype.bootstrap = function() { Transport.prototype.connect = function() { assert(!this._client, "Already connected"); assert(!this._server, "Already listening"); - - this._client = net.connect.apply(null, arguments); - this._setupClient(); + this._setupClient(net.connect.apply(null, arguments)); }; Transport.prototype.listen = function() { @@ -227,21 +214,20 @@ Transport.prototype.listen = function() { // This may obstruct people behind NAT but we copy PyBitmessage's // behavior here. client.end(); - self.emit("warning", new Error( - addr + " was tried to create second connection" + return self.emit("warning", new Error( + unmap(addr) + " was tried to create second connection" )); - return; } self._clients[addr] = client; client.on("close", function() { delete self._clients[addr]; }); - var transport = new self.constructor(objectAssign({}, self, { - _client: client, - _accepted: true, - })); + var transport = new self.constructor(self); + var accepted = true; + transport._setupClient(client, accepted); self.emit("connection", transport, unmap(addr), port); - // Emit "open" manually because "connect" won't be emitted. + // Emit "open" manually because "connect" for this socket won't be + // fired. transport.emit("open"); }); @@ -255,14 +241,6 @@ Transport.prototype.listen = function() { }); }; -function getmsg(args) { - if (typeof args[0] === "string") { - return structs.message.encode(args[0], args[1]); - } else { - return args[0]; - } -} - Transport.prototype.send = function() { if (this._client) { this._client.write(getmsg(arguments)); diff --git a/lib/net/ws.browser.js b/lib/net/ws.browser.js index d76e80f..3daab67 100644 --- a/lib/net/ws.browser.js +++ b/lib/net/ws.browser.js @@ -1,29 +1,112 @@ /** - * WebSocket transport. Generally needed because browsers can't handle - * TCP sockets so we proxy messages from clients via WebSocket into TCP - * data packets. + * WebSocket transport. Used in browser in client-mode only. Server + * handle incoming messages and wrap them into TCP data packets. */ "use strict"; +var objectAssign = Object.assign || require("object-assign"); var inherits = require("inherits"); var assert = require("../_util").assert; +var structs = require("../structs"); +var messages = require("../messages"); var BaseTransport = require("./base"); -/** - * WebSocket transport constructor. - * @constructor - * @static - */ -function Transport() { +function Transport(opts) { Transport.super_.call(this); + objectAssign(this, opts); + this.seeds = this.seeds || []; } inherits(Transport, BaseTransport); -Transport.prototype.connect = function(opts) { - assert(!this._client, "Already connected"); - this._client = new WebSocket(opts); +Transport.prototype.bootstrap = function() { + return Promise.resolve([].concat(this.seeds)); +}; + +Transport.prototype.connect = function(url, protocols) { + var self = this; + assert(!self._client, "Already connected"); + + // TODO(Kagami): Handle timeouts! + var client = self._client = new WebSocket(url, protocols); + client.binaryType = "arraybuffer"; + var verackSent = false; + var verackReceived = false; + var established = false; + + client.onopen = function() { + self.emit("open"); + self.send(messages.version.encode({ + services: self.services, + userAgent: self.userAgent, + streamNumbers: self.streamNumbers, + // This parameters are not used by the remote node so we fake them + // (because we can't resolve domain name in Browser). + remoteHost: "127.0.0.1", + remotePort: 8444, + })); + }; + + client.onmessage = function(e) { + var buf = new Buffer(new Uint8Array(e.data)); + var decoded; + try { + decoded = structs.message.decode(buf); + } catch (err) { + return self.emit("warning", new Error( + "Message decoding error from " + url + ": " + err + )); + } + self.emit("message", decoded.command, decoded.payload, decoded); + }; + + // High-level message processing. + self.on("message", function(command) { + if (!established) { + // TODO: Process version data. + if (command === "version") { + if (verackSent) { + return; + } + self.send("verack"); + verackSent = true; + if (verackReceived) { + established = true; + self.emit("established"); + } + } else if (command === "verack") { + verackReceived = true; + if (verackSent) { + established = true; + self.emit("established"); + } + } + } + }); + + client.onerror = function(err) { + self.emit("error", err); + }; + + client.onclose = function() { + self.emit("close"); + delete self._client; + }; +}; + +Transport.prototype.send = function() { + if (this._client) { + this._client.send(BaseTransport._getmsg(arguments)); + } else { + throw new Error("Not connected"); + } +}; + +Transport.prototype.close = function() { + if (this._client) { + this._client.close(); + } }; module.exports = Transport; diff --git a/lib/net/ws.js b/lib/net/ws.js index dc552cf..737c51c 100644 --- a/lib/net/ws.js +++ b/lib/net/ws.js @@ -1,38 +1,230 @@ /** - * WebSocket transport. Generally needed because browsers can't handle - * TCP sockets so we proxy messages from clients via WebSocket into TCP - * data packets. + * WebSocket transport. Needed because browsers can't handle TCP sockets + * so we use separate WebSocket server to proxy messages into TCP data + * packets. * @module bitmessage/net/ws */ "use strict"; +var objectAssign = Object.assign || require("object-assign"); var inherits = require("inherits"); var WebSocket = require("ws"); // jshint ignore:line var assert = require("../_util").assert; +var PPromise = require("../platform").Promise; +var structs = require("../structs"); +var messages = require("../messages"); var BaseTransport = require("./base"); var WebSocketServer = WebSocket.Server; +var getmsg = BaseTransport._getmsg; +var unmap = BaseTransport._unmap; /** * WebSocket transport constructor. * @constructor * @static */ -function Transport() { +function Transport(opts) { Transport.super_.call(this); + objectAssign(this, opts); + this.seeds = this.seeds || []; } inherits(Transport, BaseTransport); -Transport.prototype.connect = function(opts) { - assert(!this._client, "Already connected"); - this._client = new WebSocket(opts); +function getfrom(client) { + return unmap(client._socket.remoteAddress) + ":" + client._socket.remotePort; +} + +Transport.prototype.sendVersion = function() { + return this.send(messages.version.encode({ + services: this.services, + userAgent: this.userAgent, + streamNumbers: this.streamNumbers, + port: this.port, + remoteHost: this._client._socket.remoteAddress, + remotePort: this._client._socket.remotePort, + })); }; -Transport.prototype.listen = function(opts) { +Transport.prototype._handleTimeout = function() { + var client = this._client; + // TODO(Kagami): We may also want to close connection if it wasn't + // established within minute. + client._socket.setTimeout(20000); + client._socket.on("timeout", function() { + client.close(); + }); + this.on("established", function() { + // Raise timeout up to 10 minutes per spec. + // TODO(Kagami): Send ping frame every 5 minutes as PyBitmessage. + client._socket.setTimeout(600000); + }); +}; + +Transport.prototype._setupClient = function(client, accepted) { + var self = this; + self._client = client; + var verackSent = false; + var verackReceived = false; + var established = false; + + client.on("open", function() { + // NOTE(Kagami): This handler shouldn't be called at all for + // accepted sockets but let's be sure. + if (!accepted) { + // NOTE(Kagami): We may set timeout only after connection was + // opened because socket may not yet be available when + // `_setupClient` is called. + self._handleTimeout(); + self.emit("open"); + self.sendVersion(); + } + }); + + client.on("message", function(data, flags) { + var decoded; + if (!flags.binary) { + // TODO(Kagami): Send `error` message and ban node for some time + // if there were too many errors? + return self.emit("warning", new Error( + "Peer " + getfrom(client) + " sent non-binary data" + )); + } + try { + decoded = structs.message.decode(data); + } catch (err) { + return self.emit("warning", new Error( + "Message decoding error from " + getfrom(client) + ": " + err + )); + } + self.emit("message", decoded.command, decoded.payload, decoded); + }); + + // High-level message processing. + self.on("message", function(command) { + if (!established) { + // TODO: Process version data. + if (command === "version") { + if (verackSent) { + return; + } + self.send("verack"); + verackSent = true; + if (accepted) { + self.sendVersion(); + } else if (verackReceived) { + established = true; + self.emit("established"); + } + } else if (command === "verack") { + verackReceived = true; + if (verackSent) { + established = true; + self.emit("established"); + } + } + } + }); + + client.on("error", function(err) { + self.emit("error", err); + }); + + client.on("close", function() { + self.emit("close"); + delete self._client; + }); +}; + +Transport.prototype.bootstrap = function() { + return PPromise.resolve([].concat(this.seeds)); +}; + +Transport.prototype.connect = function(address, protocols, options) { + assert(!this._client, "Already connected"); assert(!this._server, "Already listening"); - this._server = new WebSocketServer(opts); + // `new` doesn't work with `apply`, so passing all possible arguments + // manually. + this._setupClient(new WebSocket(address, protocols, options)); +}; + +Transport.prototype.listen = function(options, callback) { + assert(!this._client, "Already connected"); + assert(!this._server, "Already listening"); + + var self = this; + var server = self._server = new WebSocketServer(options, callback); + + // TODO(Kagami): We may want to specify some limits for number of + // connected users. + server.on("connection", function(client) { + var addr = client._socket.remoteAddress; + var port = client._remotePort; + var i; + + // NOTE(Kagami): O(n) search because `clients` array is already + // provided by `ws` library. We may want to optmize it though and + // also disable `clientTracking` option. + for (i = 0; i < server.clients.length; i++) { + if (server.clients[i] !== client && + server.clients[i]._socket.remoteAddress === addr) { + // NOTE(Kagami): Doesn't allow more than one connection per IP. + // This may obstruct people behind NAT but we copy + // PyBitmessage's behavior here. + client.close(); + return self.emit("warning", new Error( + unmap(addr) + " was tried to create second connection" + )); + } + } + + var transport = new self.constructor(self); + var accepted = true; + transport._setupClient(client, accepted); + transport._handleTimeout(); + self.emit("connection", transport, unmap(addr), port); + // Emit "open" manually because it won't be fired for already opened + // socket. + transport.emit("open"); + }); + + server.on("error", function(err) { + self.emit("error", err); + }); +}; + +Transport.prototype.send = function() { + if (this._client) { + // TODO(Kagami): `mask: true` doesn't work with Chromium 40. File a + // bug to ws bugtracker. + this._client.send(getmsg(arguments), {binary: true}); + } else { + throw new Error("Not connected"); + } +}; + +Transport.prototype.broadcast = function() { + var data = getmsg(arguments); + if (this._server) { + this._server.clients.forEach(function(client) { + client.send(data, {binary: true}); + }); + } else { + throw new Error("Not listening"); + } +}; + +Transport.prototype.close = function() { + if (this._client) { + this._client.close(); + } else if (this._server) { + // `ws` server terminates immediately without any events. + this._server.close(); + this.emit("close"); + delete this._server; + } }; module.exports = Transport; diff --git a/tests/functional.js b/tests/functional.js index 06c8448..344ad67 100644 --- a/tests/functional.js +++ b/tests/functional.js @@ -14,10 +14,13 @@ if (!process.browser) { before(function(done) { tcp = new TcpTransport(); tcp.on("error", function(err) { - console.log("TCP transport error:", err); + console.log("TCP transport error: " + err); }); - // Wait some time for server. - setTimeout(done, 1000); + tcp.on("warning", function(warn) { + console.log("TCP transport warning: " + warn); + }); + // Wait some time for the server. + setTimeout(done, 300); }); it("should return nothing on bootstrap by default", function() { @@ -58,7 +61,7 @@ if (!process.browser) { }); }); - it("should establish connection", function(done) { + it("should automatically establish connection", function(done) { tcp.once("established", function() { done(); }); @@ -85,5 +88,57 @@ if (!process.browser) { } describe("WebSocket transport", function() { - it("should allow to communicate between two nodes"); + var ws; + + before(function(done) { + ws = new WsTransport(); + ws.on("error", function(err) { + console.log("WebSocket transport error: " + err); + }); + ws.on("warning", function(warn) { + console.log("WebSocket transport warning: " + warn); + }); + // Wait some time for the server. + setTimeout(done, 300); + }); + + it("should return hardcoded seeds on bootstrap", function() { + var ws2 = new WsTransport({seeds: [["ws.example.com", 8080]]}); + return ws2.bootstrap().then(function(nodes) { + expect(nodes).to.have.length(1); + expect(nodes[0][0]).to.be.equal("ws.example.com"); + expect(nodes[0][1]).to.be.equal(8080); + }); + }); + + it("should allow to interconnect two nodes", function(done) { + ws.connect("ws://127.0.0.1:22334"); + ws.once("open", function() { + done(); + }); + }); + + it("should automatically establish connection", function(done) { + ws.once("established", function() { + done(); + }); + }); + + it("should allow to communicate", function(done) { + ws.on("message", function cb(command, payload) { + if (command === "echo-res") { + expect(payload.toString()).to.equal("test"); + ws.removeListener("message", cb); + done(); + } + }); + ws.send("echo-req", Buffer("test")); + }); + + it("should allow to close connection", function(done) { + ws.close(); + ws.once("close", function() { + done(); + }); + }); }); diff --git a/tests/tcp-node.js b/tests/tcp-node.js index b69ea15..76c71c6 100644 --- a/tests/tcp-node.js +++ b/tests/tcp-node.js @@ -2,6 +2,8 @@ var TcpTransport = require("../lib/net/tcp"); function start() { var server = new TcpTransport(); + // In node 0.12/io 1.0 we can use {host: x, port: y} syntax so it'll + // be more compatible with the ws transport options. server.listen(22333, "127.0.0.1"); server.on("connection", function(client) { client.on("message", function(command, payload) { diff --git a/tests/ws-node.js b/tests/ws-node.js index e69de29..55ca7a2 100644 --- a/tests/ws-node.js +++ b/tests/ws-node.js @@ -0,0 +1,15 @@ +var WsTransport = require("../lib/net/ws"); + +function start() { + var server = new WsTransport(); + server.listen({host: "127.0.0.1", port: 22334}); + server.on("connection", function(client) { + client.on("message", function(command, payload) { + if (command === "echo-req") { + client.send("echo-res", payload); + } + }); + }); +} + +start();