Source: net/tcp.js

/**
 * TCP transport compatible with PyBitmessage. Available only for Node
 * platform.  
 * **NOTE**: `TcpTransport` is exported as a module.
 * @module bitmessage/net/tcp
 * @example
 * var messages = require("bitmessage").messages;
 * var TcpTransport = require("bitmessage/lib/net/tcp");
 *
 * var tcp = new TcpTransport({
 *   dnsSeeds: [["bootstrap8444.bitmessage.org", 8444]],
 * });
 *
 * tcp.bootstrap().then(function(nodes) {
 *   var remoteHost = nodes[0][0];
 *   var remotePort = nodes[0][1];
 *   console.log("Connecting to", nodes[0]);
 *   tcp.connect(remotePort, remoteHost);
 * });
 *
 * tcp.on("established", function() {
 *   console.log("Connection established");
 *
 *   tcp.on("message", function(command, payload) {
 *     console.log("Got new", command, "message");
 *     var decoded;
 *     if (command === "addr") {
 *       decoded = messages.addr.decodePayload(payload);
 *       console.log("Got", decoded.addrs.length, "node addresses");
 *     }
 *   });
 * });
 */

"use strict";

var objectAssign = Object.assign || require("object-assign");
var inherits = require("inherits");
var net = require("net");
var dns = require("dns");
var util = require("../_util");
var PPromise = require("../platform").Promise;
var structs = require("../structs");
var messages = require("../messages");
var BaseTransport = require("./base");

var assert = util.assert;
var getmsg = BaseTransport._getmsg;
var unmap = BaseTransport._unmap;

/**
 * TCP transport class. Implements [base transport interface]{@link
 * module:bitmessage/net/base.BaseTransport}.
 * @param {Object=} opts - Transport options
 * @param {Array} opts.seeds - Bootstrap nodes (none by default)
 * @param {Array} opts.dnsSeeds - Bootstrap DNS nodes (none by default)
 * @param {Object} opts.services -
 * [Service features]{@link module:bitmessage/structs.ServicesBitfield}
 * provided by this node (`NODE_NETWORK` by default)
 * @param {(Array|string|Buffer)} opts.userAgent -
 * [User agent]{@link module:bitmessage/user-agent} of this node
 * (user agent of bitmessage library by default)
 * @param {number[]} opts.streams - Streams accepted by this node ([1]
 * by default)
 * @param {number} opts.port - Incoming port of this node (8444 by
 * default)
 * @constructor
 * @static
 */
function TcpTransport(opts) {
  TcpTransport.super_.call(this);
  objectAssign(this, opts);
  this.seeds = this.seeds || [];
  this.dnsSeeds = this.dnsSeeds || [];
  this.streams = this.streams || [1];
  this._clients = {};
}

inherits(TcpTransport, BaseTransport);

TcpTransport.prototype._sendVersion = function() {
  return this.send(messages.version.encode({
    services: this.services,
    userAgent: this.userAgent,
    streams: this.streams,
    port: this.port,
    remoteHost: this._client.remoteAddress,
    remotePort: this._client.remotePort,
  }));
};

TcpTransport.prototype._setupClient = function(client, incoming) {
  var self = this;
  self._client = client;
  var cache = Buffer(0);
  var decoded;
  var verackSent = false;
  var verackReceived = false;
  var established = false;

  // Set default transport timeout per spec.
  // TODO(Kagami): We may also want to close connection if it wasn't
  // established within minute.
  client.setTimeout(20000);

  client.on("connect", function() {
    // NOTE(Kagami): This handler shouldn't be called at all for
    // incoming connections but let's be sure.
    if (!incoming) {
      self.emit("open");
      self._sendVersion();
    }
  });

  client.on("data", function(data) {
    // TODO(Kagami): We may want to preallocate 1.6M buffer for each
    // client instead (max size of the message) to not constantly
    // allocate new buffers. Though this may lead to another issues: too
    // many memory per client.
    cache = Buffer.concat([cache, data]);
    while (true) {
      decoded = structs.message.tryDecode(cache);
      if (!decoded) {
        break;
      }
      cache = decoded.rest;
      if (decoded.message) {
        self.emit("message", decoded.message.command, decoded.message.payload);
      } else if (decoded.error) {
        // TODO(Kagami): Wrap it in custom error class?
        // TODO(Kagami): Send `error` message and ban node for some time
        // if there were too many errors?
        self.emit("warning", new Error(
          "Message decoding error: " + decoded.error.message
        ));
      }
    }
  });

  // High-level message processing.
  self.on("message", function(command, payload) {
    var version;
    if (!established) {
      if (command === "version") {
        if (verackSent) {
          return;
        }
        try {
          version = self._decodeVersion(payload, {network: true});
        } catch(err) {
          self.emit("error", err);
          return client.end();
        }
        self.send("verack");
        verackSent = true;
        if (incoming) {
          self._sendVersion();
        } else if (verackReceived) {
          self.emit("established", version);
        }
      } else if (command === "verack") {
        verackReceived = true;
        if (verackSent) {
          self.emit("established", version);
        }
      }
    }
  });

  self.on("established", function() {
    established = true;
    // Raise timeout up to 10 minutes per spec.
    // TODO(Kagami): Send pong messages every 5 minutes as PyBitmessage.
    client.setTimeout(600000);
  });

  client.on("timeout", function() {
    client.end();
  });

  client.on("error", function(err) {
    self.emit("error", err);
  });

  client.on("close", function() {
    self.emit("close");
    delete self._client;
  });
};

function resolveDnsSeed(seed) {
  var host = seed[0];
  var port = seed[1];
  var nodes = [];
  // NOTE(Kagami):
  // 1) Node's `getaddrinfo` (`dns.lookup`) returns only one address so
  // we can't use it.
  // 2) Node's `dig host any` (`dns.resolve`) doesn't return type of the
  // record! So we resolve twice for A and AAAA.
  // 3) We ignore any errors here, promise's result is always a list.
  return new PPromise(function(resolve) {
    dns.resolve4(host, function(err, nodes4) {
      if (!err) {
        nodes4.forEach(function(n) {
          nodes.push([n, port]);
        });
      }
      dns.resolve6(host, function(err, nodes6) {
        if (!err) {
          nodes6.forEach(function(n) {
            nodes.push([n, port]);
          });
        }
        resolve(nodes);
      });
    });
  });
}

TcpTransport.prototype.bootstrap = function() {
  var hardcodedNodes = this.seeds;
  // FIXME(Kagami): Filter incorrect/private IP range nodes?
  // See also: <https://github.com/Bitmessage/PyBitmessage/issues/768>.
  return this.bootstrapDns().then(function(dnsNodes) {
    // Add hardcoded nodes to the end of list because DNS nodes should
    // be more up-to-date.
    return dnsNodes.concat(hardcodedNodes);
  });
};

/**
 * Do only DNS-specific bootstrap.
 * @return {Promise.<Array>} Discovered seed nodes.
 */
TcpTransport.prototype.bootstrapDns = function() {
  var promises = this.dnsSeeds.map(resolveDnsSeed);
  return PPromise.all(promises).then(function(dnsNodes) {
    // Flatten array of arrays.
    return Array.prototype.concat.apply([], dnsNodes);
  });
};

/**
 * Connect to a TCP node. Connection arguments are the same as for
 * [net.connect](http://nodejs.org/api/net.html#net_net_connect_port_host_connectlistener).
 */
TcpTransport.prototype.connect = function() {
  assert(!this._client, "Already connected");
  assert(!this._server, "Already listening");
  this._setupClient(net.connect.apply(null, arguments));
};

/**
 * Listen for incoming TCP connections. Listen arguments are the same as
 * for
 * [server.listen](http://nodejs.org/api/net.html#net_server_listen_port_host_backlog_callback).
 */
TcpTransport.prototype.listen = function() {
  assert(!this._client, "Already connected");
  assert(!this._server, "Already listening");

  var self = this;
  var server = self._server = net.createServer();
  server.listen.apply(server, arguments);

  var clientIdCounter = 0;

  server.on("connection", function(client) {
    var id = client.id = clientIdCounter++;
    self._clients[id] = client;
    client.on("close", function() {
      delete self._clients[id];
    });
    var opts = objectAssign({}, self);
    delete opts._server;
    var transport = new self.constructor(opts);
    var incoming = true;
    transport._setupClient(client, incoming);
    var addr = client.remoteAddress;
    var port = client.remotePort;
    self.emit("connection", transport, unmap(addr), port);
  });

  server.on("error", function(err) {
    self.emit("error", err);
  });

  server.on("close", function() {
    self.emit("close");
    delete self._server;
  });
};

TcpTransport.prototype.send = function() {
  if (this._client) {
    this._client.write(getmsg(arguments));
  } else {
    throw new Error("Not connected");
  }
};

TcpTransport.prototype.broadcast = function() {
  var data = getmsg(arguments);
  if (this._server) {
    Object.keys(this._clients).forEach(function(id) {
      this._clients[id].write(data);
    }, this);
  } else {
    throw new Error("Not listening");
  }
};

TcpTransport.prototype.close = function() {
  if (this._client) {
    this._client.end();
  } else if (this._server) {
    Object.keys(this._clients).forEach(function(id) {
      this._clients[id].end();
    }, this);
    this._server.close();
  }
};

module.exports = TcpTransport;