Almost finished TCP transport

This commit is contained in:
Kagami Hiiragi 2015-02-09 17:38:10 +03:00
parent a2fc5af597
commit 142cb76e6b
7 changed files with 182 additions and 50 deletions

View File

@ -55,7 +55,7 @@ API documentation is available [here](https://bitchan.github.io/bitmessage/docs/
- [x] Address
- [x] UserAgent
- [ ] Network transports
- [ ] TCP (Node.js only)
- [x] TCP (Node.js only)
- [ ] WebSocket
- [ ] WebRTC
- [ ] Parse PyBitmessage configs

View File

@ -54,6 +54,8 @@ var randomNonce = bmcrypto.randomBytes(8);
* @namespace
* @static
*/
// TODO(Kagami): User agent and stream numbers size limits per
// <https://github.com/Bitmessage/PyBitmessage/issues/767>.
var version = exports.version = {
/**
* Decode `version` message.
@ -127,6 +129,7 @@ var version = exports.version = {
var time = opts.time || new Date();
var nonce = opts.nonce || randomNonce;
assert(nonce.length === 8, "Bad nonce");
var port = opts.port || 8444;
var userAgent = opts.userAgent || UserAgent.SELF;
var streamNumbers = opts.streamNumbers || [1];
// Start encoding.
@ -141,7 +144,7 @@ var version = exports.version = {
var addrFrom = structs.net_addr.encode({
services: services,
host: "127.0.0.1",
port: opts.port,
port: port,
short: true,
});
return Buffer.concat([

View File

@ -55,7 +55,9 @@ BaseTransport.prototype.listen = function() {
/**
* Send [message]{@link module:bitmessage/structs.message} over the
* wire (client mode).
* @param {Buffer} msg - Encoded message
* @param {(Buffer|string)} msg - Encoded message or command string
* @param (?Buffer} payload - Message payload (used if the first
* argument is a string)
* @abstract
*/
BaseTransport.prototype.send = function() {
@ -65,7 +67,9 @@ BaseTransport.prototype.send = function() {
/**
* Send [message]{@link module:bitmessage/structs.message} to all
* connected clients (server mode).
* @param {Buffer} msg - Encoded message
* @param {(Buffer|string)} msg - Encoded message or command string
* @param (?Buffer} payload - Message payload (used if the first
* argument is a string)
* @abstract
*/
BaseTransport.prototype.broadcast = function() {

View File

@ -6,11 +6,14 @@
"use strict";
var objectAssign = Object.assign || require("object-assign");
var inherits = require("inherits");
var net = require("net");
var dns = require("dns");
var assert = require("../_util").assert;
var PPromise = require("../platform").Promise;
var structs = require("../structs");
var messages = require("../messages");
var BaseTransport = require("./base");
/**
@ -20,30 +23,122 @@ var BaseTransport = require("./base");
*/
function Transport(opts) {
Transport.super_.call(this);
opts = opts || {};
this.seeds = opts.seeds || [];
this.dnsSeeds = opts.dnsSeeds || [];
if (opts.client) {
this._setupClient(opts.client);
}
// To track connected clients in server mode.
objectAssign(this, opts);
this.seeds = this.seeds || [];
this.dnsSeeds = this.dnsSeeds || [];
this._clients = {};
if (this._client) {
this._setupClient();
}
}
inherits(Transport, BaseTransport);
Transport.prototype._setupClient = function(client) {
// Unmap IPv4-mapped IPv6 address.
function unmap(addr) {
if (addr.slice(0, 7) === "::ffff:") {
return addr.slice(7);
} else {
return addr;
}
}
Transport.prototype.sendVersion = function() {
return this.send(messages.version.encode({
services: this.services,
userAgent: this.userAgent,
streamNumbers: this.streamNumbers,
port: this.port,
remoteHost: this._client.remoteAddress,
remotePort: this._client.remotePort,
}));
};
Transport.prototype._setupClient = function() {
var self = this;
self._client = client;
var client = self._client;
var cache = Buffer(0);
var decoded;
var verackSent = false;
var verackReceived = false;
var established = false;
// Set default transport timeout per spec.
client.setTimeout(20);
// TODO(Kagami): We may also want to close connection if it wasn't
// established within minute.
client.setTimeout(20000);
client.on("connect", function() {
self.emit("open");
// NOTE(Kagami): This handler shouldn't be called at all for
// accepted sockets but let's be sure.
if (!self._accepted) {
self.sendVersion();
}
});
client.on("data", function() {
client.on("data", function(data) {
// TODO(Kagami): We may want to preallocate 1.6M buffer for each
// client instead (max size of the message) to not constantly
// allocate new buffers. Though this may lead to another issues: too
// many memory per client.
cache = Buffer.concat([cache, data]);
while (true) {
decoded = structs.message.tryDecode(cache);
if (!decoded) {
break;
}
cache = decoded.rest;
if (decoded.message) {
self.emit(
"message",
decoded.message.command,
decoded.message.payload,
decoded.message);
} else if (decoded.error) {
// TODO(Kagami): Wrap it in custom error class?
// TODO(Kagami): Send `error` message and ban node for some time
// if there were too many errors?
self.emit("warning", new Error(
"Message decoding error from " +
unmap(client.remoteAddress) + ":" + client.remotePort,
": " +
decoded.error
));
}
}
});
// High-level message processing.
self.on("message", function(command) {
if (!established) {
// TODO: Process version data.
if (command === "version") {
if (verackSent) {
return;
}
self.send("verack");
verackSent = true;
if (self._accepted) {
self.sendVersion();
}
if (verackReceived) {
self.emit("established");
}
} else if (command === "verack") {
verackReceived = true;
if (verackSent) {
self.emit("established");
}
}
}
});
self.on("established", function() {
established = true;
// Raise timeout up to 10 minutes per spec.
// TODO(Kagami): Send pong messages every 5 minutes as PyBitmessage.
client.setTimeout(600000);
});
client.on("timeout", function() {
@ -57,6 +152,9 @@ Transport.prototype._setupClient = function(client) {
client.on("close", function() {
self.emit("close");
delete self._client;
verackSent = false;
verackReceived = false;
established = false;
});
};
@ -95,10 +193,10 @@ Transport.prototype.bootstrap = function() {
// FIXME(Kagami): Filter incorrect/private IP range nodes?
// See also: <https://github.com/Bitmessage/PyBitmessage/issues/768>.
return PPromise.all(promises).then(function(dnsNodes) {
// Flatten array of arrays.
dnsNodes = Array.prototype.concat.apply([], dnsNodes);
// Add hardcoded nodes to the end of list because DNS nodes should
// be more up-to-date.
// Flatten array of array of arrays.
dnsNodes = Array.prototype.concat.apply([], dnsNodes);
return dnsNodes.concat(hardcodedNodes);
});
};
@ -107,19 +205,10 @@ Transport.prototype.connect = function() {
assert(!this._client, "Already connected");
assert(!this._server, "Already listening");
var client = net.connect.apply(null, arguments);
this._setupClient(client);
this._client = net.connect.apply(null, arguments);
this._setupClient();
};
// Unmap IPv4-mapped IPv6 addresses.
function unmap(addr) {
if (addr.indexOf("::ffff:") === 0) {
return addr.slice(7);
} else {
return addr;
}
}
Transport.prototype.listen = function() {
assert(!this._client, "Already connected");
assert(!this._server, "Already listening");
@ -128,27 +217,32 @@ Transport.prototype.listen = function() {
var server = self._server = net.createServer();
server.listen.apply(server, arguments);
server.on("connection", function(sock) {
var addr = sock.remoteAddress;
var port = sock.remotePort;
// TODO(Kagami): We may want to specify some limits for number of
// connected users.
server.on("connection", function(client) {
var addr = client.remoteAddress;
var port = client.remotePort;
if (self._clients[addr]) {
// NOTE(Kagami): Doesn't allow more than one connection per IP.
// This may obstruct people behind NAT but we copy PyBitmessage's
// behavior here.
sock.end();
self.emit("warning", addr + " was tried to connect once more");
client.end();
self.emit("warning", new Error(
addr + " was tried to create second connection"
));
return;
}
self._clients[addr] = sock;
sock.on("close", function() {
self._clients[addr] = client;
client.on("close", function() {
delete self._clients[addr];
});
var transport = new self.constructor({
client: sock,
seeds: self.seeds,
dnsSeeds: self.dnsSeeds,
});
var transport = new self.constructor(objectAssign({}, self, {
_client: client,
_accepted: true,
}));
self.emit("connection", transport, unmap(addr), port);
// Emit "open" manually because "connect" won't be emitted.
transport.emit("open");
});
server.on("error", function(err) {
@ -161,15 +255,24 @@ Transport.prototype.listen = function() {
});
};
Transport.prototype.send = function(data) {
function getmsg(args) {
if (typeof args[0] === "string") {
return structs.message.encode(args[0], args[1]);
} else {
return args[0];
}
}
Transport.prototype.send = function() {
if (this._client) {
this._client.write(data);
this._client.write(getmsg(arguments));
} else {
throw new Error("Not connected");
}
};
Transport.prototype.broadcast = function(data) {
Transport.prototype.broadcast = function() {
var data = getmsg(arguments);
if (this._server) {
Object.keys(this._clients).forEach(function(ip) {
this._clients[ip].write(data);

View File

@ -215,9 +215,7 @@ var message = exports.message = {
encode: function(command, payload) {
assert(command.length <= 12, "Command is too long");
assert(isAscii(command), "Non-ASCII characters in command");
if (!payload) {
payload = new Buffer(0);
}
payload = payload || new Buffer(0);
assert(payload.length <= 1600003, "Message payload is too big");
var buf = new Buffer(24 + payload.length);
buf.fill(0);

View File

@ -53,14 +53,31 @@ if (!process.browser) {
it("should allow to interconnect two nodes", function(done) {
tcp.connect(22333, "127.0.0.1");
tcp.on("open", function() {
tcp.once("open", function() {
done();
});
});
it("should establish connection", function(done) {
tcp.once("established", function() {
done();
});
});
it("should allow to communicate", function(done) {
tcp.on("message", function cb(command, payload) {
if (command === "echo-res") {
expect(payload.toString()).to.equal("test");
tcp.removeListener("message", cb);
done();
}
});
tcp.send("echo-req", Buffer("test"));
});
it("should allow to close connection", function(done) {
tcp.close();
tcp.on("close", function() {
tcp.once("close", function() {
done();
});
});

View File

@ -1,8 +1,15 @@
var TcpTransport = require("../lib/net/tcp");
function start() {
var tcp = new TcpTransport();
tcp.listen(22333, "127.0.0.1");
var server = new TcpTransport();
server.listen(22333, "127.0.0.1");
server.on("connection", function(client) {
client.on("message", function(command, payload) {
if (command === "echo-req") {
client.send("echo-res", payload);
}
});
});
}
start();