WebSocket transport

This commit is contained in:
Kagami Hiiragi 2015-02-10 00:56:55 +03:00
parent 142cb76e6b
commit 084ddc8084
9 changed files with 415 additions and 71 deletions

View File

@ -56,7 +56,7 @@ API documentation is available [here](https://bitchan.github.io/bitmessage/docs/
- [x] UserAgent - [x] UserAgent
- [ ] Network transports - [ ] Network transports
- [x] TCP (Node.js only) - [x] TCP (Node.js only)
- [ ] WebSocket - [x] WebSocket
- [ ] WebRTC - [ ] WebRTC
- [ ] Parse PyBitmessage configs - [ ] Parse PyBitmessage configs
- [ ] keys.dat - [ ] keys.dat

View File

@ -25,8 +25,7 @@ var ServicesBitfield = structs.ServicesBitfield;
* Note that this function doesn't do any validation because it is * Note that this function doesn't do any validation because it is
* already provided by * already provided by
* [message.decode]{@link module:bitmessage/structs.message.decode} * [message.decode]{@link module:bitmessage/structs.message.decode}
* routine. Normally you call this for each incoming message and then * routine.
* call decode function of the appropriate message handler.
* @param {Buffer} buf - Buffer that starts with encoded message * @param {Buffer} buf - Buffer that starts with encoded message
* @return {?string} Message's command if any. * @return {?string} Message's command if any.
*/ */

View File

@ -10,6 +10,7 @@
var inherits = require("inherits"); var inherits = require("inherits");
var EventEmitter = require("events").EventEmitter; var EventEmitter = require("events").EventEmitter;
var PPromise = require("../platform").Promise; var PPromise = require("../platform").Promise;
var structs = require("../structs");
/** /**
* Network transport base class. * Network transport base class.
@ -84,4 +85,23 @@ BaseTransport.prototype.close = function() {
throw new Error("Not implemented"); throw new Error("Not implemented");
}; };
// Static helpers.
BaseTransport._getmsg = function(args) {
if (typeof args[0] === "string") {
return structs.message.encode(args[0], args[1]);
} else {
return args[0];
}
};
// Unmap IPv4-mapped IPv6 address.
BaseTransport._unmap = function(addr) {
if (addr.slice(0, 7) === "::ffff:") {
return addr.slice(7);
} else {
return addr;
}
};
module.exports = BaseTransport; module.exports = BaseTransport;

View File

@ -16,6 +16,9 @@ var structs = require("../structs");
var messages = require("../messages"); var messages = require("../messages");
var BaseTransport = require("./base"); var BaseTransport = require("./base");
var getmsg = BaseTransport._getmsg;
var unmap = BaseTransport._unmap;
/** /**
* TCP transport constructor. * TCP transport constructor.
* @constructor * @constructor
@ -27,20 +30,12 @@ function Transport(opts) {
this.seeds = this.seeds || []; this.seeds = this.seeds || [];
this.dnsSeeds = this.dnsSeeds || []; this.dnsSeeds = this.dnsSeeds || [];
this._clients = {}; this._clients = {};
if (this._client) {
this._setupClient();
}
} }
inherits(Transport, BaseTransport); inherits(Transport, BaseTransport);
// Unmap IPv4-mapped IPv6 address. function getfrom(client) {
function unmap(addr) { return unmap(client.remoteAddress) + ":" + client.remotePort;
if (addr.slice(0, 7) === "::ffff:") {
return addr.slice(7);
} else {
return addr;
}
} }
Transport.prototype.sendVersion = function() { Transport.prototype.sendVersion = function() {
@ -54,9 +49,9 @@ Transport.prototype.sendVersion = function() {
})); }));
}; };
Transport.prototype._setupClient = function() { Transport.prototype._setupClient = function(client, accepted) {
var self = this; var self = this;
var client = self._client; self._client = client;
var cache = Buffer(0); var cache = Buffer(0);
var decoded; var decoded;
var verackSent = false; var verackSent = false;
@ -69,10 +64,10 @@ Transport.prototype._setupClient = function() {
client.setTimeout(20000); client.setTimeout(20000);
client.on("connect", function() { client.on("connect", function() {
self.emit("open");
// NOTE(Kagami): This handler shouldn't be called at all for // NOTE(Kagami): This handler shouldn't be called at all for
// accepted sockets but let's be sure. // accepted sockets but let's be sure.
if (!self._accepted) { if (!accepted) {
self.emit("open");
self.sendVersion(); self.sendVersion();
} }
}); });
@ -100,9 +95,7 @@ Transport.prototype._setupClient = function() {
// TODO(Kagami): Send `error` message and ban node for some time // TODO(Kagami): Send `error` message and ban node for some time
// if there were too many errors? // if there were too many errors?
self.emit("warning", new Error( self.emit("warning", new Error(
"Message decoding error from " + "Message decoding error from " + getfrom(client) + ": " +
unmap(client.remoteAddress) + ":" + client.remotePort,
": " +
decoded.error decoded.error
)); ));
} }
@ -119,10 +112,9 @@ Transport.prototype._setupClient = function() {
} }
self.send("verack"); self.send("verack");
verackSent = true; verackSent = true;
if (self._accepted) { if (accepted) {
self.sendVersion(); self.sendVersion();
} } else if (verackReceived) {
if (verackReceived) {
self.emit("established"); self.emit("established");
} }
} else if (command === "verack") { } else if (command === "verack") {
@ -152,9 +144,6 @@ Transport.prototype._setupClient = function() {
client.on("close", function() { client.on("close", function() {
self.emit("close"); self.emit("close");
delete self._client; delete self._client;
verackSent = false;
verackReceived = false;
established = false;
}); });
}; };
@ -204,9 +193,7 @@ Transport.prototype.bootstrap = function() {
Transport.prototype.connect = function() { Transport.prototype.connect = function() {
assert(!this._client, "Already connected"); assert(!this._client, "Already connected");
assert(!this._server, "Already listening"); assert(!this._server, "Already listening");
this._setupClient(net.connect.apply(null, arguments));
this._client = net.connect.apply(null, arguments);
this._setupClient();
}; };
Transport.prototype.listen = function() { Transport.prototype.listen = function() {
@ -227,21 +214,20 @@ Transport.prototype.listen = function() {
// This may obstruct people behind NAT but we copy PyBitmessage's // This may obstruct people behind NAT but we copy PyBitmessage's
// behavior here. // behavior here.
client.end(); client.end();
self.emit("warning", new Error( return self.emit("warning", new Error(
addr + " was tried to create second connection" unmap(addr) + " was tried to create second connection"
)); ));
return;
} }
self._clients[addr] = client; self._clients[addr] = client;
client.on("close", function() { client.on("close", function() {
delete self._clients[addr]; delete self._clients[addr];
}); });
var transport = new self.constructor(objectAssign({}, self, { var transport = new self.constructor(self);
_client: client, var accepted = true;
_accepted: true, transport._setupClient(client, accepted);
}));
self.emit("connection", transport, unmap(addr), port); self.emit("connection", transport, unmap(addr), port);
// Emit "open" manually because "connect" won't be emitted. // Emit "open" manually because "connect" for this socket won't be
// fired.
transport.emit("open"); transport.emit("open");
}); });
@ -255,14 +241,6 @@ Transport.prototype.listen = function() {
}); });
}; };
function getmsg(args) {
if (typeof args[0] === "string") {
return structs.message.encode(args[0], args[1]);
} else {
return args[0];
}
}
Transport.prototype.send = function() { Transport.prototype.send = function() {
if (this._client) { if (this._client) {
this._client.write(getmsg(arguments)); this._client.write(getmsg(arguments));

View File

@ -1,29 +1,112 @@
/** /**
* WebSocket transport. Generally needed because browsers can't handle * WebSocket transport. Used in browser in client-mode only. Server
* TCP sockets so we proxy messages from clients via WebSocket into TCP * handle incoming messages and wrap them into TCP data packets.
* data packets.
*/ */
"use strict"; "use strict";
var objectAssign = Object.assign || require("object-assign");
var inherits = require("inherits"); var inherits = require("inherits");
var assert = require("../_util").assert; var assert = require("../_util").assert;
var structs = require("../structs");
var messages = require("../messages");
var BaseTransport = require("./base"); var BaseTransport = require("./base");
/** function Transport(opts) {
* WebSocket transport constructor.
* @constructor
* @static
*/
function Transport() {
Transport.super_.call(this); Transport.super_.call(this);
objectAssign(this, opts);
this.seeds = this.seeds || [];
} }
inherits(Transport, BaseTransport); inherits(Transport, BaseTransport);
Transport.prototype.connect = function(opts) { Transport.prototype.bootstrap = function() {
assert(!this._client, "Already connected"); return Promise.resolve([].concat(this.seeds));
this._client = new WebSocket(opts); };
Transport.prototype.connect = function(url, protocols) {
var self = this;
assert(!self._client, "Already connected");
// TODO(Kagami): Handle timeouts!
var client = self._client = new WebSocket(url, protocols);
client.binaryType = "arraybuffer";
var verackSent = false;
var verackReceived = false;
var established = false;
client.onopen = function() {
self.emit("open");
self.send(messages.version.encode({
services: self.services,
userAgent: self.userAgent,
streamNumbers: self.streamNumbers,
// This parameters are not used by the remote node so we fake them
// (because we can't resolve domain name in Browser).
remoteHost: "127.0.0.1",
remotePort: 8444,
}));
};
client.onmessage = function(e) {
var buf = new Buffer(new Uint8Array(e.data));
var decoded;
try {
decoded = structs.message.decode(buf);
} catch (err) {
return self.emit("warning", new Error(
"Message decoding error from " + url + ": " + err
));
}
self.emit("message", decoded.command, decoded.payload, decoded);
};
// High-level message processing.
self.on("message", function(command) {
if (!established) {
// TODO: Process version data.
if (command === "version") {
if (verackSent) {
return;
}
self.send("verack");
verackSent = true;
if (verackReceived) {
established = true;
self.emit("established");
}
} else if (command === "verack") {
verackReceived = true;
if (verackSent) {
established = true;
self.emit("established");
}
}
}
});
client.onerror = function(err) {
self.emit("error", err);
};
client.onclose = function() {
self.emit("close");
delete self._client;
};
};
Transport.prototype.send = function() {
if (this._client) {
this._client.send(BaseTransport._getmsg(arguments));
} else {
throw new Error("Not connected");
}
};
Transport.prototype.close = function() {
if (this._client) {
this._client.close();
}
}; };
module.exports = Transport; module.exports = Transport;

View File

@ -1,38 +1,230 @@
/** /**
* WebSocket transport. Generally needed because browsers can't handle * WebSocket transport. Needed because browsers can't handle TCP sockets
* TCP sockets so we proxy messages from clients via WebSocket into TCP * so we use separate WebSocket server to proxy messages into TCP data
* data packets. * packets.
* @module bitmessage/net/ws * @module bitmessage/net/ws
*/ */
"use strict"; "use strict";
var objectAssign = Object.assign || require("object-assign");
var inherits = require("inherits"); var inherits = require("inherits");
var WebSocket = require("ws"); // jshint ignore:line var WebSocket = require("ws"); // jshint ignore:line
var assert = require("../_util").assert; var assert = require("../_util").assert;
var PPromise = require("../platform").Promise;
var structs = require("../structs");
var messages = require("../messages");
var BaseTransport = require("./base"); var BaseTransport = require("./base");
var WebSocketServer = WebSocket.Server; var WebSocketServer = WebSocket.Server;
var getmsg = BaseTransport._getmsg;
var unmap = BaseTransport._unmap;
/** /**
* WebSocket transport constructor. * WebSocket transport constructor.
* @constructor * @constructor
* @static * @static
*/ */
function Transport() { function Transport(opts) {
Transport.super_.call(this); Transport.super_.call(this);
objectAssign(this, opts);
this.seeds = this.seeds || [];
} }
inherits(Transport, BaseTransport); inherits(Transport, BaseTransport);
Transport.prototype.connect = function(opts) { function getfrom(client) {
assert(!this._client, "Already connected"); return unmap(client._socket.remoteAddress) + ":" + client._socket.remotePort;
this._client = new WebSocket(opts); }
Transport.prototype.sendVersion = function() {
return this.send(messages.version.encode({
services: this.services,
userAgent: this.userAgent,
streamNumbers: this.streamNumbers,
port: this.port,
remoteHost: this._client._socket.remoteAddress,
remotePort: this._client._socket.remotePort,
}));
}; };
Transport.prototype.listen = function(opts) { Transport.prototype._handleTimeout = function() {
var client = this._client;
// TODO(Kagami): We may also want to close connection if it wasn't
// established within minute.
client._socket.setTimeout(20000);
client._socket.on("timeout", function() {
client.close();
});
this.on("established", function() {
// Raise timeout up to 10 minutes per spec.
// TODO(Kagami): Send ping frame every 5 minutes as PyBitmessage.
client._socket.setTimeout(600000);
});
};
Transport.prototype._setupClient = function(client, accepted) {
var self = this;
self._client = client;
var verackSent = false;
var verackReceived = false;
var established = false;
client.on("open", function() {
// NOTE(Kagami): This handler shouldn't be called at all for
// accepted sockets but let's be sure.
if (!accepted) {
// NOTE(Kagami): We may set timeout only after connection was
// opened because socket may not yet be available when
// `_setupClient` is called.
self._handleTimeout();
self.emit("open");
self.sendVersion();
}
});
client.on("message", function(data, flags) {
var decoded;
if (!flags.binary) {
// TODO(Kagami): Send `error` message and ban node for some time
// if there were too many errors?
return self.emit("warning", new Error(
"Peer " + getfrom(client) + " sent non-binary data"
));
}
try {
decoded = structs.message.decode(data);
} catch (err) {
return self.emit("warning", new Error(
"Message decoding error from " + getfrom(client) + ": " + err
));
}
self.emit("message", decoded.command, decoded.payload, decoded);
});
// High-level message processing.
self.on("message", function(command) {
if (!established) {
// TODO: Process version data.
if (command === "version") {
if (verackSent) {
return;
}
self.send("verack");
verackSent = true;
if (accepted) {
self.sendVersion();
} else if (verackReceived) {
established = true;
self.emit("established");
}
} else if (command === "verack") {
verackReceived = true;
if (verackSent) {
established = true;
self.emit("established");
}
}
}
});
client.on("error", function(err) {
self.emit("error", err);
});
client.on("close", function() {
self.emit("close");
delete self._client;
});
};
Transport.prototype.bootstrap = function() {
return PPromise.resolve([].concat(this.seeds));
};
Transport.prototype.connect = function(address, protocols, options) {
assert(!this._client, "Already connected");
assert(!this._server, "Already listening"); assert(!this._server, "Already listening");
this._server = new WebSocketServer(opts); // `new` doesn't work with `apply`, so passing all possible arguments
// manually.
this._setupClient(new WebSocket(address, protocols, options));
};
Transport.prototype.listen = function(options, callback) {
assert(!this._client, "Already connected");
assert(!this._server, "Already listening");
var self = this;
var server = self._server = new WebSocketServer(options, callback);
// TODO(Kagami): We may want to specify some limits for number of
// connected users.
server.on("connection", function(client) {
var addr = client._socket.remoteAddress;
var port = client._remotePort;
var i;
// NOTE(Kagami): O(n) search because `clients` array is already
// provided by `ws` library. We may want to optmize it though and
// also disable `clientTracking` option.
for (i = 0; i < server.clients.length; i++) {
if (server.clients[i] !== client &&
server.clients[i]._socket.remoteAddress === addr) {
// NOTE(Kagami): Doesn't allow more than one connection per IP.
// This may obstruct people behind NAT but we copy
// PyBitmessage's behavior here.
client.close();
return self.emit("warning", new Error(
unmap(addr) + " was tried to create second connection"
));
}
}
var transport = new self.constructor(self);
var accepted = true;
transport._setupClient(client, accepted);
transport._handleTimeout();
self.emit("connection", transport, unmap(addr), port);
// Emit "open" manually because it won't be fired for already opened
// socket.
transport.emit("open");
});
server.on("error", function(err) {
self.emit("error", err);
});
};
Transport.prototype.send = function() {
if (this._client) {
// TODO(Kagami): `mask: true` doesn't work with Chromium 40. File a
// bug to ws bugtracker.
this._client.send(getmsg(arguments), {binary: true});
} else {
throw new Error("Not connected");
}
};
Transport.prototype.broadcast = function() {
var data = getmsg(arguments);
if (this._server) {
this._server.clients.forEach(function(client) {
client.send(data, {binary: true});
});
} else {
throw new Error("Not listening");
}
};
Transport.prototype.close = function() {
if (this._client) {
this._client.close();
} else if (this._server) {
// `ws` server terminates immediately without any events.
this._server.close();
this.emit("close");
delete this._server;
}
}; };
module.exports = Transport; module.exports = Transport;

View File

@ -14,10 +14,13 @@ if (!process.browser) {
before(function(done) { before(function(done) {
tcp = new TcpTransport(); tcp = new TcpTransport();
tcp.on("error", function(err) { tcp.on("error", function(err) {
console.log("TCP transport error:", err); console.log("TCP transport error: " + err);
}); });
// Wait some time for server. tcp.on("warning", function(warn) {
setTimeout(done, 1000); console.log("TCP transport warning: " + warn);
});
// Wait some time for the server.
setTimeout(done, 300);
}); });
it("should return nothing on bootstrap by default", function() { it("should return nothing on bootstrap by default", function() {
@ -58,7 +61,7 @@ if (!process.browser) {
}); });
}); });
it("should establish connection", function(done) { it("should automatically establish connection", function(done) {
tcp.once("established", function() { tcp.once("established", function() {
done(); done();
}); });
@ -85,5 +88,57 @@ if (!process.browser) {
} }
describe("WebSocket transport", function() { describe("WebSocket transport", function() {
it("should allow to communicate between two nodes"); var ws;
before(function(done) {
ws = new WsTransport();
ws.on("error", function(err) {
console.log("WebSocket transport error: " + err);
});
ws.on("warning", function(warn) {
console.log("WebSocket transport warning: " + warn);
});
// Wait some time for the server.
setTimeout(done, 300);
});
it("should return hardcoded seeds on bootstrap", function() {
var ws2 = new WsTransport({seeds: [["ws.example.com", 8080]]});
return ws2.bootstrap().then(function(nodes) {
expect(nodes).to.have.length(1);
expect(nodes[0][0]).to.be.equal("ws.example.com");
expect(nodes[0][1]).to.be.equal(8080);
});
});
it("should allow to interconnect two nodes", function(done) {
ws.connect("ws://127.0.0.1:22334");
ws.once("open", function() {
done();
});
});
it("should automatically establish connection", function(done) {
ws.once("established", function() {
done();
});
});
it("should allow to communicate", function(done) {
ws.on("message", function cb(command, payload) {
if (command === "echo-res") {
expect(payload.toString()).to.equal("test");
ws.removeListener("message", cb);
done();
}
});
ws.send("echo-req", Buffer("test"));
});
it("should allow to close connection", function(done) {
ws.close();
ws.once("close", function() {
done();
});
});
}); });

View File

@ -2,6 +2,8 @@ var TcpTransport = require("../lib/net/tcp");
function start() { function start() {
var server = new TcpTransport(); var server = new TcpTransport();
// In node 0.12/io 1.0 we can use {host: x, port: y} syntax so it'll
// be more compatible with the ws transport options.
server.listen(22333, "127.0.0.1"); server.listen(22333, "127.0.0.1");
server.on("connection", function(client) { server.on("connection", function(client) {
client.on("message", function(command, payload) { client.on("message", function(command, payload) {

View File

@ -0,0 +1,15 @@
var WsTransport = require("../lib/net/ws");
function start() {
var server = new WsTransport();
server.listen({host: "127.0.0.1", port: 22334});
server.on("connection", function(client) {
client.on("message", function(command, payload) {
if (command === "echo-req") {
client.send("echo-res", payload);
}
});
});
}
start();