Minimal TCP node

This commit is contained in:
Kagami Hiiragi 2015-02-05 21:38:36 +03:00
parent 0fd8fb595f
commit 84ab3ec3a0
7 changed files with 216 additions and 30 deletions

View File

@ -22,20 +22,14 @@ function BaseTransport() {
inherits(BaseTransport, EventEmitter);
/**
* Seed nodes for this transport. Consist of `[host, port]` pairs.
* Note that this nodes shouldn't be advertised via `addr` messages.
* @const {Array.}
*/
BaseTransport.prototype.SEED_NODES = [];
/**
* Do the transport-specific bootstrap process and return promise that
* contains discovered nodes when fulfilled.
* @return {Promise.<Array.>}
* @abstract
*/
BaseTransport.prototype.bootstrap = function() {
return PPromise.resolve([].concat(this.SEED_NODES));
return PPromise.reject(new Error("Not implemented"));
};
/**
@ -48,24 +42,6 @@ BaseTransport.prototype.connect = function() {
throw new Error("Not implemented");
};
/**
* Send [message]{@link module:bitmessage/structs.message} over the
* wire.
* @param {Buffer} msg - Encoded message
* @abstract
*/
BaseTransport.prototype.send = function() {
throw new Error("Not implemented");
};
/**
* Close connection.
* @abstract
*/
BaseTransport.prototype.close = function() {
throw new Error("Not implemented");
};
/**
* Listen for the transport-specific incoming connections.
* Should emit `connection` event with a transport instance for each new
@ -76,4 +52,32 @@ BaseTransport.prototype.listen = function() {
throw new Error("Not implemented");
};
/**
* Send [message]{@link module:bitmessage/structs.message} over the
* wire (client mode).
* @param {Buffer} msg - Encoded message
* @abstract
*/
BaseTransport.prototype.send = function() {
throw new Error("Not implemented");
};
/**
* Send [message]{@link module:bitmessage/structs.message} to all
* connected clients (server mode).
* @param {Buffer} msg - Encoded message
* @abstract
*/
BaseTransport.prototype.broadcast = function() {
throw new Error("Not implemented");
};
/**
* Close connection(s) and/or stop listening.
* @abstract
*/
BaseTransport.prototype.close = function() {
throw new Error("Not implemented");
};
exports.BaseTransport = BaseTransport;

View File

@ -7,17 +7,132 @@
"use strict";
var inherits = require("inherits");
var net = require("net");
var assert = require("../_util").assert;
var PPromise = require("../platform").Promise;
var BaseTransport = require("./base").BaseTransport;
var sockIdCounter = 0;
/**
* TCP transport constructor.
* @constructor
* @static
*/
function Transport() {
function Transport(opts) {
Transport.super_.call(this);
opts = opts || {};
if (opts.seeds) {
this.seeds = opts.seeds;
}
if (opts.client) {
this._setupClient(opts.client);
}
// To track connected clients in server mode.
this._clients = {};
}
inherits(Transport, BaseTransport);
Transport.prototype._setupClient = function(client) {
var self = this;
self._client = client;
// Set default transport timeout per spec.
client.setTimeout(20);
client.on("connect", function() {
self.emit("open");
});
client.on("data", function() {
});
client.on("timeout", function() {
client.end();
});
client.on("error", function(err) {
self.emit("error", err);
});
client.on("close", function() {
self.emit("close");
delete self._client;
});
};
Transport.prototype.bootstrap = function() {
// TODO(Kagami): Think how to set up DNS/IP nodes. Do we need to
// hardcode them?
};
Transport.prototype.connect = function() {
assert(!this._client, "Already connected");
assert(!this._server, "Already listening");
var client = net.connect.apply(null, arguments);
this._setupClient(client);
};
Transport.prototype.listen = function() {
assert(!this._client, "Already connected");
assert(!this._server, "Already listening");
var self = this;
var server = self._server = net.createServer();
server.listen.apply(server, arguments);
server.on("connection", function(sock) {
sock.id = sockIdCounter++;
self._clients[sock.id] = sock;
sock.on("close", function() {
delete self._clients[sock.id];
});
var transport = new self.constructor({
client: sock,
seeds: this.seeds,
});
self.emit("connection", transport);
});
server.on("error", function(err) {
self.emit("error", err);
});
server.on("close", function() {
self.emit("close");
delete self._server;
});
};
Transport.prototype.send = function(data) {
if (this._client) {
this._client.write(data);
} else {
throw new Error("Not connected");
}
};
Transport.prototype.broadcast = function(data) {
if (this._server) {
Object.keys(this._clients).forEach(function(id) {
this._clients[id].write(data);
}, this);
} else {
throw new Error("Not listening");
}
};
Transport.prototype.close = function() {
if (this._client) {
this._client.end();
} else if (this._server) {
Object.keys(this._clients).forEach(function(id) {
this._clients[id].end();
}, this);
this._server.close();
}
};
exports.Transport = Transport;

View File

@ -3,11 +3,34 @@ var structs = bitmessage.structs;
var message = structs.message;
var WsTransport = require("../lib/net/ws").Transport;
var TcpTransport, tcp;
if (!process.browser) {
var TcpTransport = require("../lib/net/tcp").Transport;
TcpTransport = require("../lib/net/tcp").Transport;
describe("TCP transport", function() {
it("should allow to communicate between two nodes");
before(function(done) {
tcp = new TcpTransport();
tcp.on("error", function(err) {
console.log("TCP transport error:", err);
});
// Wait some time for server.
setTimeout(done, 1000);
});
it("should allow to interconnect two nodes", function(done) {
tcp.connect(22333, "127.0.0.1");
tcp.on("open", function() {
done();
});
});
it("should allow to close connection", function(done) {
tcp.close();
tcp.on("close", function() {
done();
});
});
});
}

View File

@ -7,7 +7,7 @@ if (testMode !== "functional") {
}
if (testMode !== "unit") {
// For Browser tests nodes are runned from karma.conf.js because we
// For Browser tests nodes are being run from karma.conf.js because we
// are _already_ in browser context here.
if (!process.browser) require("./run-test-nodes.js")();

View File

@ -3,5 +3,41 @@
// Note that this file is executed only on Node.js platform.
var path = require("path");
var child = require("child_process");
var TCP_NODE_PATH = path.join(__dirname, "tcp-node.js");
var WS_NODE_PATH = path.join(__dirname, "ws-node.js");
function spawn(path) {
var p = child.spawn("node", [path]);
p.stderr.on("data", function(err) {
console.log("Error from", path, ":", err.toString());
});
return p;
}
module.exports = function() {
function cleanup(doExit) {
return function(err) {
try {
tcpNode.kill("SIGKILL");
} catch(e) {
console.log(e.stack);
}
try {
wsNode.kill("SIGKILL");
} catch(e) {
console.log(e.stack);
}
if (err) console.log(err.stack);
if (doExit) process.exit(1);
};
}
var tcpNode = spawn(TCP_NODE_PATH);
var wsNode = spawn(WS_NODE_PATH);
process.on("exit", cleanup());
process.on("SIGINT", cleanup(true));
process.on("uncaughtException", cleanup(true));
};

8
tests/tcp-node.js Normal file
View File

@ -0,0 +1,8 @@
var TcpTransport = require("../lib/net/tcp").Transport;
function start() {
var tcp = new TcpTransport();
tcp.listen(22333, "127.0.0.1");
}
start();

0
tests/ws-node.js Normal file
View File