Decode and validate version message in transports
This commit is contained in:
parent
ec8e4e1cb2
commit
c916023393
|
@ -12,8 +12,10 @@
|
|||
|
||||
var inherits = require("inherits");
|
||||
var EventEmitter = require("events").EventEmitter;
|
||||
var util = require("../_util");
|
||||
var PPromise = require("../platform").Promise;
|
||||
var structs = require("../structs");
|
||||
var messages = require("../messages");
|
||||
|
||||
/**
|
||||
* Base transport class. Allows to use single class for both client and
|
||||
|
@ -91,8 +93,9 @@ BaseTransport.prototype.close = function() {
|
|||
throw new Error("Not implemented");
|
||||
};
|
||||
|
||||
// Static private helpers.
|
||||
// Private helpers.
|
||||
|
||||
// Make a message from variable number of arguments.
|
||||
BaseTransport._getmsg = function(args) {
|
||||
if (typeof args[0] === "string") {
|
||||
return structs.message.encode(args[0], args[1]);
|
||||
|
@ -110,4 +113,53 @@ BaseTransport._unmap = function(addr) {
|
|||
}
|
||||
};
|
||||
|
||||
// Check whether two given arrays intersect.
|
||||
// NOTE(Kagami): It has O(n*m) complexity in the worst case but:
|
||||
// * Max length of stream list = 160,000
|
||||
// * One of the arrays (our streams) should have reasonable length
|
||||
function intersects(a, b) {
|
||||
var alen = a.length;
|
||||
var blen = b.length;
|
||||
if (!alen || !blen) {
|
||||
return false;
|
||||
}
|
||||
var i, j;
|
||||
for (i = 0; i < alen; ++i) {
|
||||
for (j = 0; j < blen; ++j) {
|
||||
if (a[i] === b[j]) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Decode and validate version message.
|
||||
BaseTransport.prototype._decodeVersion = function(payload) {
|
||||
var version;
|
||||
try {
|
||||
version = messages.version.decodePayload(payload);
|
||||
} catch(err) {
|
||||
throw new Error("Version decode error: " + err.message);
|
||||
}
|
||||
if (version.version < util.PROTOCOL_VERSION) {
|
||||
throw new Error("Peer uses old protocol v" + version.version);
|
||||
}
|
||||
// TODO(Kagami): We may want to send error message describing the time
|
||||
// offset problem to this node as PyBitmessage.
|
||||
var delta = (version.time.getTime() - new Date().getTime()) / 1000;
|
||||
if (delta > 3600) {
|
||||
throw new Error("Peer's time is too far in the future: +" + delta + "s");
|
||||
}
|
||||
if (delta < -3600) {
|
||||
throw new Error("Peer's time is too far in the past: " + delta + "s");
|
||||
}
|
||||
if (!intersects(this.streamNumbers, version.streamNumbers)) {
|
||||
throw new Error(
|
||||
"Peer isn't interested in our streams; " +
|
||||
"first 10 peer's streams: " + version.streamNumbers.slice(0, 10)
|
||||
);
|
||||
}
|
||||
return version;
|
||||
};
|
||||
|
||||
module.exports = BaseTransport;
|
||||
|
|
|
@ -38,12 +38,14 @@ var objectAssign = Object.assign || require("object-assign");
|
|||
var inherits = require("inherits");
|
||||
var net = require("net");
|
||||
var dns = require("dns");
|
||||
var assert = require("../_util").assert;
|
||||
var util = require("../_util");
|
||||
var PPromise = require("../platform").Promise;
|
||||
var structs = require("../structs");
|
||||
var messages = require("../messages");
|
||||
var BaseTransport = require("./base");
|
||||
|
||||
var assert = util.assert;
|
||||
var ServicesBitfield = structs.ServicesBitfield;
|
||||
var getmsg = BaseTransport._getmsg;
|
||||
var unmap = BaseTransport._unmap;
|
||||
|
||||
|
@ -58,9 +60,9 @@ var unmap = BaseTransport._unmap;
|
|||
* provided by this node (`NODE_NETWORK` by default)
|
||||
* @param {(Array|string|Buffer)} opts.userAgent -
|
||||
* [User agent]{@link module:bitmessage/user-agent} of this node
|
||||
* (bitmessage's by default)
|
||||
* (user agent of bitmessage library by default)
|
||||
* @param {number[]} opts.streamNumbers - Streams accepted by this node
|
||||
* (1 by default)
|
||||
* ([1] by default)
|
||||
* @param {number} opts.port - Incoming port of this node (8444 by
|
||||
* default)
|
||||
* @constructor
|
||||
|
@ -71,6 +73,7 @@ function TcpTransport(opts) {
|
|||
objectAssign(this, opts);
|
||||
this.seeds = this.seeds || [];
|
||||
this.dnsSeeds = this.dnsSeeds || [];
|
||||
this.streamNumbers = this.streamNumbers || [1];
|
||||
this._clients = {};
|
||||
}
|
||||
|
||||
|
@ -140,25 +143,38 @@ TcpTransport.prototype._setupClient = function(client, incoming) {
|
|||
});
|
||||
|
||||
// High-level message processing.
|
||||
self.on("message", function(command) {
|
||||
self.on("message", function(command, payload) {
|
||||
var version;
|
||||
if (!established) {
|
||||
// TODO: Process version data.
|
||||
// TODO: Disconnect if proto version < 3.
|
||||
if (command === "version") {
|
||||
if (verackSent) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
version = self._decodeVersion(payload);
|
||||
} catch(err) {
|
||||
self.emit("error", err);
|
||||
return client.end();
|
||||
}
|
||||
if (!version.services.get(ServicesBitfield.NODE_NETWORK)) {
|
||||
self.emit("error", new Error(
|
||||
"Not a normal network node: " + version.services
|
||||
));
|
||||
return client.end();
|
||||
}
|
||||
|
||||
self.send("verack");
|
||||
verackSent = true;
|
||||
if (incoming) {
|
||||
self._sendVersion();
|
||||
} else if (verackReceived) {
|
||||
self.emit("established");
|
||||
self.emit("established", version);
|
||||
}
|
||||
} else if (command === "verack") {
|
||||
verackReceived = true;
|
||||
if (verackSent) {
|
||||
self.emit("established");
|
||||
self.emit("established", version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,10 +12,17 @@ var structs = require("../structs");
|
|||
var messages = require("../messages");
|
||||
var BaseTransport = require("./base");
|
||||
|
||||
var ServicesBitfield = structs.ServicesBitfield;
|
||||
var getmsg = BaseTransport._getmsg;
|
||||
|
||||
function WsTransport(opts) {
|
||||
WsTransport.super_.call(this);
|
||||
objectAssign(this, opts);
|
||||
this.seeds = this.seeds || [];
|
||||
this.services = this.services || ServicesBitfield().set([
|
||||
ServicesBitfield.NODE_MOBILE,
|
||||
]);
|
||||
this.streamNumbers = this.streamNumbers || [1];
|
||||
}
|
||||
|
||||
inherits(WsTransport, BaseTransport);
|
||||
|
@ -62,25 +69,38 @@ WsTransport.prototype.connect = function(url, protocols) {
|
|||
};
|
||||
|
||||
// High-level message processing.
|
||||
self.on("message", function(command) {
|
||||
self.on("message", function(command, payload) {
|
||||
var version;
|
||||
if (!established) {
|
||||
// TODO: Process version data.
|
||||
// TODO: Disconnect if proto version < 3.
|
||||
if (command === "version") {
|
||||
if (verackSent) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
version = self._decodeVersion(payload);
|
||||
} catch(err) {
|
||||
self.emit("error", err);
|
||||
return client.close();
|
||||
}
|
||||
if (!version.services.get(ServicesBitfield.NODE_GATEWAY)) {
|
||||
self.emit("error", new Error(
|
||||
"Not a gateway node: " + version.services
|
||||
));
|
||||
return client.close();
|
||||
}
|
||||
|
||||
self.send("verack");
|
||||
verackSent = true;
|
||||
if (verackReceived) {
|
||||
established = true;
|
||||
self.emit("established");
|
||||
self.emit("established", version);
|
||||
}
|
||||
} else if (command === "verack") {
|
||||
verackReceived = true;
|
||||
if (verackSent) {
|
||||
established = true;
|
||||
self.emit("established");
|
||||
self.emit("established", version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -98,7 +118,7 @@ WsTransport.prototype.connect = function(url, protocols) {
|
|||
|
||||
WsTransport.prototype.send = function() {
|
||||
if (this._client) {
|
||||
this._client.send(BaseTransport._getmsg(arguments));
|
||||
this._client.send(getmsg(arguments));
|
||||
} else {
|
||||
throw new Error("Not connected");
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ var messages = require("../messages");
|
|||
var BaseTransport = require("./base");
|
||||
|
||||
var WebSocketServer = WebSocket.Server;
|
||||
var ServicesBitfield = structs.ServicesBitfield;
|
||||
var getmsg = BaseTransport._getmsg;
|
||||
var unmap = BaseTransport._unmap;
|
||||
|
||||
|
@ -30,12 +31,13 @@ var unmap = BaseTransport._unmap;
|
|||
* @param {Array} opts.seeds - Bootstrap nodes (none by default)
|
||||
* @param {Object} opts.services -
|
||||
* [Service features]{@link module:bitmessage/structs.ServicesBitfield}
|
||||
* provided by this node (`NODE_NETWORK` by default)
|
||||
* provided by this node (`NODE_MOBILE` for Browser and `NODE_MOBILE` +
|
||||
* `NODE_GATEWAY` for Node by default)
|
||||
* @param {(Array|string|Buffer)} opts.userAgent -
|
||||
* [User agent]{@link module:bitmessage/user-agent} of this node
|
||||
* (bitmessage's by default)
|
||||
* (user agent of bitmessage library by default)
|
||||
* @param {number[]} opts.streamNumbers - Streams accepted by this node
|
||||
* (1 by default)
|
||||
* ([1] by default)
|
||||
* @param {number} opts.port - Incoming port of this node, makes sence
|
||||
* only on Node platform (18444 by default)
|
||||
* @constructor
|
||||
|
@ -45,6 +47,11 @@ function WsTransport(opts) {
|
|||
WsTransport.super_.call(this);
|
||||
objectAssign(this, opts);
|
||||
this.seeds = this.seeds || [];
|
||||
this.services = this.services || ServicesBitfield().set([
|
||||
ServicesBitfield.NODE_MOBILE,
|
||||
ServicesBitfield.NODE_GATEWAY,
|
||||
]);
|
||||
this.streamNumbers = this.streamNumbers || [1];
|
||||
this.port = this.port || 18444;
|
||||
}
|
||||
|
||||
|
@ -114,27 +121,46 @@ WsTransport.prototype._setupClient = function(client, incoming) {
|
|||
});
|
||||
|
||||
// High-level message processing.
|
||||
self.on("message", function(command) {
|
||||
self.on("message", function(command, payload) {
|
||||
var version;
|
||||
if (!established) {
|
||||
// TODO: Process version data.
|
||||
// TODO: Disconnect if proto version < 3.
|
||||
if (command === "version") {
|
||||
if (verackSent) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
version = self._decodeVersion(payload);
|
||||
} catch(err) {
|
||||
self.emit("error", err);
|
||||
return client.close();
|
||||
}
|
||||
if (incoming && !version.services.get(ServicesBitfield.NODE_MOBILE)) {
|
||||
self.emit("error", new Error(
|
||||
"Not a mobile node: " + version.services
|
||||
));
|
||||
return client.close();
|
||||
}
|
||||
if (!incoming && !version.services.get(ServicesBitfield.NODE_GATEWAY)) {
|
||||
self.emit("error", new Error(
|
||||
"Not a gateway node: " + version.services
|
||||
));
|
||||
return client.close();
|
||||
}
|
||||
|
||||
self.send("verack");
|
||||
verackSent = true;
|
||||
if (incoming) {
|
||||
self._sendVersion();
|
||||
} else if (verackReceived) {
|
||||
established = true;
|
||||
self.emit("established");
|
||||
self.emit("established", version);
|
||||
}
|
||||
} else if (command === "verack") {
|
||||
verackReceived = true;
|
||||
if (verackSent) {
|
||||
established = true;
|
||||
self.emit("established");
|
||||
self.emit("established", version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ var encode = exports.encode = function(software) {
|
|||
};
|
||||
|
||||
/**
|
||||
* Encode bitmessage's library user agent.
|
||||
* Encode user agent of bitmessage library.
|
||||
* @return {Buffer} Encoded user agent.
|
||||
*/
|
||||
exports.encodeSelf = function() {
|
||||
|
@ -100,7 +100,7 @@ exports.encodeSelf = function() {
|
|||
};
|
||||
|
||||
/**
|
||||
* Encode user agent with bitmessage's library user agent underneath.
|
||||
* Encode user agent with user agent of bitmessage library underneath.
|
||||
* Most underlying software comes first.
|
||||
* @param {(Object[]|string[]|Object|string)} software - List of
|
||||
* software to encode
|
||||
|
|
|
@ -2,6 +2,7 @@ var expect = require("chai").expect;
|
|||
|
||||
var bitmessage = require("../lib");
|
||||
var structs = bitmessage.structs;
|
||||
var ServicesBitfield = structs.ServicesBitfield;
|
||||
var message = structs.message;
|
||||
var WsTransport = require("../lib/net/ws");
|
||||
|
||||
|
@ -62,7 +63,13 @@ if (!process.browser) {
|
|||
});
|
||||
|
||||
it("should automatically establish connection", function(done) {
|
||||
tcp.once("established", function() {
|
||||
tcp.once("established", function(version) {
|
||||
expect(version.version).to.equal(3);
|
||||
expect(version.services.get(ServicesBitfield.NODE_NETWORK)).to.be.true;
|
||||
expect(version.remoteHost).to.equal("127.0.0.1");
|
||||
expect(version.port).to.equal(22333);
|
||||
expect(version.userAgent).to.be.a("string");
|
||||
expect(version.streamNumbers).to.deep.equal([1]);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
@ -119,7 +126,12 @@ describe("WebSocket transport", function() {
|
|||
});
|
||||
|
||||
it("should automatically establish connection", function(done) {
|
||||
ws.once("established", function() {
|
||||
ws.once("established", function(version) {
|
||||
expect(version.services.get(ServicesBitfield.NODE_GATEWAY)).to.be.true;
|
||||
expect(version.remoteHost).to.equal("127.0.0.1");
|
||||
expect(version.port).to.equal(22334);
|
||||
expect(version.userAgent).to.be.a("string");
|
||||
expect(version.streamNumbers).to.deep.equal([1]);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
var TcpTransport = require("../lib/net/tcp");
|
||||
|
||||
function start() {
|
||||
var server = new TcpTransport();
|
||||
var server = new TcpTransport({port: 22333});
|
||||
// In node 0.12/io 1.0 we can use {host: x, port: y} syntax so it'll
|
||||
// be more compatible with the ws transport options.
|
||||
server.listen(22333, "127.0.0.1");
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
var WsTransport = require("../lib/net/ws");
|
||||
|
||||
function start() {
|
||||
var server = new WsTransport();
|
||||
var server = new WsTransport({port: 22334});
|
||||
server.listen({host: "127.0.0.1", port: 22334});
|
||||
server.on("connection", function(client) {
|
||||
client.on("message", function(command, payload) {
|
||||
|
|
Loading…
Reference in New Issue
Block a user