From 84ab3ec3a048b0165499dedf3bf4173d929c17f1 Mon Sep 17 00:00:00 2001 From: Kagami Hiiragi Date: Thu, 5 Feb 2015 21:38:36 +0300 Subject: [PATCH] Minimal TCP node --- lib/net/base.js | 56 ++++++++++--------- lib/net/tcp.js | 117 +++++++++++++++++++++++++++++++++++++++- tests/functional.js | 27 +++++++++- tests/index.js | 2 +- tests/run-test-nodes.js | 36 +++++++++++++ tests/tcp-node.js | 8 +++ tests/ws-node.js | 0 7 files changed, 216 insertions(+), 30 deletions(-) create mode 100644 tests/tcp-node.js create mode 100644 tests/ws-node.js diff --git a/lib/net/base.js b/lib/net/base.js index 5dfff95..1174a3e 100644 --- a/lib/net/base.js +++ b/lib/net/base.js @@ -22,20 +22,14 @@ function BaseTransport() { inherits(BaseTransport, EventEmitter); -/** - * Seed nodes for this transport. Consist of `[host, port]` pairs. - * Note that this nodes shouldn't be advertised via `addr` messages. - * @const {Array.} - */ -BaseTransport.prototype.SEED_NODES = []; - /** * Do the transport-specific bootstrap process and return promise that * contains discovered nodes when fulfilled. * @return {Promise.} + * @abstract */ BaseTransport.prototype.bootstrap = function() { - return PPromise.resolve([].concat(this.SEED_NODES)); + return PPromise.reject(new Error("Not implemented")); }; /** @@ -48,24 +42,6 @@ BaseTransport.prototype.connect = function() { throw new Error("Not implemented"); }; -/** - * Send [message]{@link module:bitmessage/structs.message} over the - * wire. - * @param {Buffer} msg - Encoded message - * @abstract - */ -BaseTransport.prototype.send = function() { - throw new Error("Not implemented"); -}; - -/** - * Close connection. - * @abstract - */ -BaseTransport.prototype.close = function() { - throw new Error("Not implemented"); -}; - /** * Listen for the transport-specific incoming connections. * Should emit `connection` event with a transport instance for each new @@ -76,4 +52,32 @@ BaseTransport.prototype.listen = function() { throw new Error("Not implemented"); }; +/** + * Send [message]{@link module:bitmessage/structs.message} over the + * wire (client mode). + * @param {Buffer} msg - Encoded message + * @abstract + */ +BaseTransport.prototype.send = function() { + throw new Error("Not implemented"); +}; + +/** + * Send [message]{@link module:bitmessage/structs.message} to all + * connected clients (server mode). + * @param {Buffer} msg - Encoded message + * @abstract + */ +BaseTransport.prototype.broadcast = function() { + throw new Error("Not implemented"); +}; + +/** + * Close connection(s) and/or stop listening. + * @abstract + */ +BaseTransport.prototype.close = function() { + throw new Error("Not implemented"); +}; + exports.BaseTransport = BaseTransport; diff --git a/lib/net/tcp.js b/lib/net/tcp.js index bd57b12..d60b807 100644 --- a/lib/net/tcp.js +++ b/lib/net/tcp.js @@ -7,17 +7,132 @@ "use strict"; var inherits = require("inherits"); +var net = require("net"); +var assert = require("../_util").assert; +var PPromise = require("../platform").Promise; var BaseTransport = require("./base").BaseTransport; +var sockIdCounter = 0; + /** * TCP transport constructor. * @constructor * @static */ -function Transport() { +function Transport(opts) { Transport.super_.call(this); + opts = opts || {}; + if (opts.seeds) { + this.seeds = opts.seeds; + } + if (opts.client) { + this._setupClient(opts.client); + } + // To track connected clients in server mode. + this._clients = {}; } inherits(Transport, BaseTransport); +Transport.prototype._setupClient = function(client) { + var self = this; + self._client = client; + + // Set default transport timeout per spec. + client.setTimeout(20); + + client.on("connect", function() { + self.emit("open"); + }); + + client.on("data", function() { + }); + + client.on("timeout", function() { + client.end(); + }); + + client.on("error", function(err) { + self.emit("error", err); + }); + + client.on("close", function() { + self.emit("close"); + delete self._client; + }); +}; + +Transport.prototype.bootstrap = function() { + // TODO(Kagami): Think how to set up DNS/IP nodes. Do we need to + // hardcode them? +}; + +Transport.prototype.connect = function() { + assert(!this._client, "Already connected"); + assert(!this._server, "Already listening"); + + var client = net.connect.apply(null, arguments); + this._setupClient(client); +}; + +Transport.prototype.listen = function() { + assert(!this._client, "Already connected"); + assert(!this._server, "Already listening"); + + var self = this; + var server = self._server = net.createServer(); + server.listen.apply(server, arguments); + + server.on("connection", function(sock) { + sock.id = sockIdCounter++; + self._clients[sock.id] = sock; + sock.on("close", function() { + delete self._clients[sock.id]; + }); + var transport = new self.constructor({ + client: sock, + seeds: this.seeds, + }); + self.emit("connection", transport); + }); + + server.on("error", function(err) { + self.emit("error", err); + }); + + server.on("close", function() { + self.emit("close"); + delete self._server; + }); +}; + +Transport.prototype.send = function(data) { + if (this._client) { + this._client.write(data); + } else { + throw new Error("Not connected"); + } +}; + +Transport.prototype.broadcast = function(data) { + if (this._server) { + Object.keys(this._clients).forEach(function(id) { + this._clients[id].write(data); + }, this); + } else { + throw new Error("Not listening"); + } +}; + +Transport.prototype.close = function() { + if (this._client) { + this._client.end(); + } else if (this._server) { + Object.keys(this._clients).forEach(function(id) { + this._clients[id].end(); + }, this); + this._server.close(); + } +}; + exports.Transport = Transport; diff --git a/tests/functional.js b/tests/functional.js index d90d2f5..1dc6ac0 100644 --- a/tests/functional.js +++ b/tests/functional.js @@ -3,11 +3,34 @@ var structs = bitmessage.structs; var message = structs.message; var WsTransport = require("../lib/net/ws").Transport; +var TcpTransport, tcp; + if (!process.browser) { - var TcpTransport = require("../lib/net/tcp").Transport; + TcpTransport = require("../lib/net/tcp").Transport; describe("TCP transport", function() { - it("should allow to communicate between two nodes"); + before(function(done) { + tcp = new TcpTransport(); + tcp.on("error", function(err) { + console.log("TCP transport error:", err); + }); + // Wait some time for server. + setTimeout(done, 1000); + }); + + it("should allow to interconnect two nodes", function(done) { + tcp.connect(22333, "127.0.0.1"); + tcp.on("open", function() { + done(); + }); + }); + + it("should allow to close connection", function(done) { + tcp.close(); + tcp.on("close", function() { + done(); + }); + }); }); } diff --git a/tests/index.js b/tests/index.js index 0709cd4..dc06590 100644 --- a/tests/index.js +++ b/tests/index.js @@ -7,7 +7,7 @@ if (testMode !== "functional") { } if (testMode !== "unit") { - // For Browser tests nodes are runned from karma.conf.js because we + // For Browser tests nodes are being run from karma.conf.js because we // are _already_ in browser context here. if (!process.browser) require("./run-test-nodes.js")(); diff --git a/tests/run-test-nodes.js b/tests/run-test-nodes.js index dc5cb31..e246f1f 100644 --- a/tests/run-test-nodes.js +++ b/tests/run-test-nodes.js @@ -3,5 +3,41 @@ // Note that this file is executed only on Node.js platform. +var path = require("path"); +var child = require("child_process"); + +var TCP_NODE_PATH = path.join(__dirname, "tcp-node.js"); +var WS_NODE_PATH = path.join(__dirname, "ws-node.js"); + +function spawn(path) { + var p = child.spawn("node", [path]); + p.stderr.on("data", function(err) { + console.log("Error from", path, ":", err.toString()); + }); + return p; +} + module.exports = function() { + function cleanup(doExit) { + return function(err) { + try { + tcpNode.kill("SIGKILL"); + } catch(e) { + console.log(e.stack); + } + try { + wsNode.kill("SIGKILL"); + } catch(e) { + console.log(e.stack); + } + if (err) console.log(err.stack); + if (doExit) process.exit(1); + }; + } + + var tcpNode = spawn(TCP_NODE_PATH); + var wsNode = spawn(WS_NODE_PATH); + process.on("exit", cleanup()); + process.on("SIGINT", cleanup(true)); + process.on("uncaughtException", cleanup(true)); }; diff --git a/tests/tcp-node.js b/tests/tcp-node.js new file mode 100644 index 0000000..a3ebf76 --- /dev/null +++ b/tests/tcp-node.js @@ -0,0 +1,8 @@ +var TcpTransport = require("../lib/net/tcp").Transport; + +function start() { + var tcp = new TcpTransport(); + tcp.listen(22333, "127.0.0.1"); +} + +start(); diff --git a/tests/ws-node.js b/tests/ws-node.js new file mode 100644 index 0000000..e69de29