Source: types/peer.js

'use strict';

// Constants
const {
  FABRIC_KEY_DERIVATION_PATH,
  P2P_IDENT_REQUEST,
  P2P_IDENT_RESPONSE,
  P2P_ROOT,
  P2P_PING,
  P2P_PONG,
  P2P_PORT,
  P2P_START_CHAIN,
  P2P_INSTRUCTION,
  P2P_BASE_MESSAGE,
  P2P_STATE_COMMITTMENT,
  P2P_STATE_CHANGE,
  P2P_STATE_ROOT
} = require('../constants');

// Dependencies
const net = require('net');
const crypto = require('crypto');
const stream = require('stream');
const manager = require('fast-json-patch');
const noise = require('noise-protocol-stream');
const merge = require('lodash.merge');

// Fabric Types
const Actor = require('./actor');
const Identity = require('./identity');
const Signer = require('./signer');
const Key = require('./key');
const Machine = require('./machine');
const Message = require('./message');
const Service = require('./service');
const Wallet = require('./wallet');

// Constants
const PROLOGUE = 'FABRIC';

/**
 * An in-memory representation of a node in our network.
 */
class Peer extends Service {
  /**
   * Create an instance of {@link Peer}.
   * @param {Object} [config] Initialization Vector for this peer.
   * @param {Boolean} [config.listen] Whether or not to listen for connections.
   * @param {Boolean} [config.upnp] Whether or not to use UPNP for automatic configuration.
   * @param {Number} [config.port=7777] Port to use for P2P connections.
   * @param {Array} [config.peers=[]] List of initial peers.
   */
  constructor (config = {}) {
    super(config);

    this.name = 'Peer';
    this.settings = merge({
      constraints: {
        peers: {
          max: 32,
          shuffle: 8
        }
      },
      interface: '0.0.0.0',
      interval: 60000, // 1 minute
      network: 'regtest',
      networking: true,
      listen: true,
      peers: [],
      port: 7777,
      state: Object.assign({
        actors: {},
        channels: {},
        contracts: {},
        documents: {},
        messages: {},
        services: {}
      }, config.state),
      upnp: false,
      key: {}
    }, config);

    // Network Internals
    this.upnp = null;
    this.server = net.createServer(this._NOISESocketHandler.bind(this));
    this.stream = new stream.Transform({
      transform (chunk, encoding, done) {
        done(null, chunk);
      }
    });

    this.identity = new Identity(this.settings.key);
    this.signer = new Signer(this.settings.key);
    this.key = new Key(this.settings.key);
    // this.wallet = new Wallet(this.settings.key);

    // this.hex = this.key.public.encodeCompressed('hex');
    // this.pkh = crypto.createHash('sha256').update(this.hex).digest('hex');

    // Public Details
    this.public = {
      ip: null,
      port: this.settings.port
    };

    // Internal properties
    this.actors = {};
    this.contracts = {};
    this.chains = {};
    this.candidates = [];
    this.connections = {};
    this.history = [];
    this.peers = {};
    this.mailboxes = {};
    this.memory = {};
    this.handlers = {};
    this.messages = new Set();
    this.sessions = {};

    // Internal Stack Machine
    this.machine = new Machine();
    this.observer = null;

    this.meta = {
      messages: {
        inbound: 0,
        outbound: 0
      }
    };

    this._state = {
      content: this.settings.state,
      peers: {},
      chains: {},
      connections: {},
      status: 'sleeping'
    };

    return this;
  }

  get id () {
    return this.key.pubkey;
  }

  get pubkeyhash () {
    // TODO: switch to child pubkey
    // path: m/7777'/0'/0/0
    return this.key.pubkeyhash;
  }

  /**
   * @deprecated
   */
  get address () {
    return this.settings.interface || this.settings.address;
  }

  get documentation () {
    return {
      name: 'Fabric',
      description: 'Manages connections to the Fabric Network.',
      methods: {
        ack: {
          description: 'Acknowledge a message.',
          parameters: {
            message: {
              // TODO: consider making this a FabricMessageID
              type: 'FabricMessage',
              description: 'The message to acknowledge.'
            }
          },
          returns: {
            type: 'Promise',
            description: 'A Promise which resolves to the completed FabricState.'
          }
        },
        send: {
          description: 'Send a message to a connected peer.',
          parameters: {
            message: {
              type: 'FabricMessage',
              description: 'The message to send to the peer.'
            }
          },
          returns: {
            type: 'Promise',
            description: 'A Promise which resolves to the response (if any).'
          }
        },
        broadcast: {
          description: 'Broadcast a message to all connected nodes.',
          parameters: {
            message: {
              type: 'FabricMessage',
              description: 'The message to send to the node.'
            }
          },
          returns: {
            type: 'Promise',
            description: 'A Promise which resolves to the responses (if any).'
          }
        }
      }
    }
  }

  get interface () {
    return this.settings.interface || this.settings.address;
  }

  get port () {
    return this.settings.port || 7777;
  }

  beat () {
    const initial = new Actor(this.state);
    const now = (new Date()).toISOString();

    this.commit();
    this.emit('beat', {
      created: now,
      initial: initial.toGenericMessage(),
      state: this.state
    });

    return this;
  }

  /**
   * Write a {@link Buffer} to all connected peers.
   * @param {Buffer} message Message buffer to send.
   */
  broadcast (message, origin = null) {
    for (const id in this.connections) {
      if (id === origin) continue;
      this.connections[id]._writeFabric(message);
    }
  }

  relayFrom (origin, message, socket = null) {
    for (const id in this.connections) {
      if (id === origin) continue;
      this.connections[id]._writeFabric(message.toBuffer(), socket);
    }
  }

  _beginFabricHandshake (client) {
    // Start handshake
    const vector = ['P2P_SESSION_OFFER', JSON.stringify({
      type: 'P2P_SESSION_OFFER',
      actor: {
        id: this.identity.id
      },
      object: {
        challenge: crypto.randomBytes(8).toString('hex'),
      }
    })];

    // Create offer message
    const P2P_SESSION_OFFER = Message.fromVector(vector)._setSigner(this.signer).sign();
    const message = P2P_SESSION_OFFER.toBuffer();
    if (this.settings.debug) this.emit('debug', `session_offer ${P2P_SESSION_OFFER} ${message.toString('hex')}`);

    // Send handshake
    try {
      client.encrypt.write(message);
    } catch (exception) {
      this.emit('error', `Cannot write to socket: ${exception}`);
    }

    return this;
  }

  /**
   * Open a Fabric connection to the target address and initiate the Fabric Protocol.
   * @param {String} target Target address.
   */
  _connect (target) {
    const url = new URL(`tcp://${target}`);
    const id = url.username;

    if (!url.port) target += `:${P2P_PORT}`;

    this._registerActor({ name: target });
    this._registerPeer({ identity: id });

    // Set up the NOISE socket
    const socket = net.createConnection(url.port || P2P_PORT, url.hostname);
    const client = noise({
      initiator: true,
      prologue: Buffer.from(PROLOGUE),
      // privateKey: derived.privkey,
      verify: this._verifyNOISE.bind(this)
    });

    socket.on('error', (error) => {
      this.emit('error', `Socket error: ${error}`);
    });

    socket.on('open', (info) => {
      this.emit('debug', `Socket open: ${info}`);
    });

    socket.on('close', (info) => {
      this.emit('debug', `Outbound socket closed: (${target}) ${info}`);
      socket._destroyFabric();
    });

    socket.on('end', (info) => {
      this.emit('debug', `Socket end: (${target}) ${info}`);
    });

    // Handle trusted Fabric messages
    client.decrypt.on('data', (data) => {
      this._handleFabricMessage(data, { name: target }, client);
    });

    // Start stream
    client.encrypt.pipe(socket).pipe(client.decrypt);

    // TODO: output stream
    // client.decrypt.pipe(this.stream);

    this._registerNOISEClient(target, socket, client);
    this._beginFabricHandshake(client);

    this.emit('connections:open', {
      address: target,
      id: target,
      url: url
    });
  }

  _announceAlias (alias, origin = null, socket = null) {
    const PACKET_PEER_ALIAS = Message.fromVector(['P2P_PEER_ALIAS', JSON.stringify({
      type: 'P2P_PEER_ALIAS',
      object: {
        name: alias
      }
    })]);

    const announcement = PACKET_PEER_ALIAS.toBuffer();
    this.broadcast(announcement, origin.name);
  }

  _destroyFabric (socket, target) {
    if (socket._keepalive) clearInterval(socket._keepalive);

    delete this.connections[target];
    delete this.peers[target];

    this.emit('connections:close', {
      address: target,
      name: target
    });
  }

  /**
   * Attempt to fill available connection slots with new peers.
   * @returns {Peer} Instance of the peer.
   */
  _fillPeerSlots () {
    if (this.connections.length >= this.settings.constraints.peers.max) return;
    const openCount = this.settings.constraints.peers.max - Object.keys(this.connections).length;
    for (let i = 0; i < openCount; i++) {
      if (!this.candidates.length) continue;
      const candidate = this.candidates.shift();
      // this.emit('debug', `Filling peer slot ${i} of ${openCount} (max ${this.settings.constraints.peers.max}) with candidate: ${JSON.stringify(candidate, null, '  ')}`);
      
      try {
        this._connect(`${candidate.object.host}:${candidate.object.port}`);
      } catch (exception) {
        this.emit('error', `Unable to fill open peer slot ${i}: ${exception}`);
      }

      // Place the candidate back in the list
      this.candidates.push(candidate);
    }

    return this;
  }

  /**
   * Handle a Fabric {@link Message} buffer.
   * @param {Buffer} buffer
   * @returns {Peer} Instance of the Peer.
   */
  _handleFabricMessage (buffer, origin = null, socket = null) {
    const hash = crypto.createHash('sha256').update(buffer).digest('hex');
    const message = Message.fromBuffer(buffer);
    if (this.settings.debug) this.emit('debug', `Got Fabric message: ${message}`);

    // Have we seen this message before?
    if (this.messages[hash]) {
      // this.emit('debug', `Duplicate message: ${hash}`);
      return;
    }

    // Store message for later
    this.messages[hash] = buffer.toString('hex');

    const checksum = crypto.createHash('sha256').update(message.body, 'utf8').digest('hex');
    if (checksum !== message.raw.hash.toString('hex')) throw new Error('Message received with incorrect hash.');

    // TODO: verify signatures
    // const signer = new Signer({ public: message.raw.author });
    // this.emit('debug', `Message signer: ${signer}`);
    if (this.settings.debug) this.emit('debug', `Message author: ${message.raw.signature.toString('hex')}`);
    if (this.settings.debug) this.emit('debug', `Message signature: ${message.raw.signature.toString('hex')}`);

    switch (message.type) {
      default:
        this.emit('debug', `Unhandled message type: ${message.type}`);
        break;
      case 'GenericMessage':
      case 'PeerMessage':
      case 'ChatMessage':
        // Parse JSON body
        try {
          const content = JSON.parse(message.data);
          this._handleGenericMessage(content, origin, socket);
        } catch (exception) {
          this.emit('error', `Broken content body: ${exception}`);
        }

        break;
    }

    this.commit();

    return this;
  }

  _handleGenericMessage (message, origin = null, socket = null) {
    if (this.settings.debug) this.emit('debug', `Generic message:\n\tFrom: ${JSON.stringify(origin)}\n\tType: ${message.type}\n\tBody:\n\`\`\`\n${JSON.stringify(message.object, null, '  ')}\n\`\`\``);

    // Lookup the appropriate Actor for the message's origin
    const actor = new Actor(origin);

    switch (message.type) {
      default:
        this.emit('debug', `Unhandled Generic Message: ${message.type} ${JSON.stringify(message, null, '  ')}`);
        break;
      case 'P2P_SESSION_OFFER':
        if (this.settings.debug) this.emit('debug', `Handling session offer: ${JSON.stringify(message.object)}`);
        if (this.settings.debug) this.emit('debug', `Session offer origin: ${JSON.stringify(origin)}`);
        if (this.settings.debug) this.emit('debug', `connections: ${JSON.stringify(Object.keys(this.connections))}`);

        // Peer is valid
        // TODO: remove this assumption (validate above)
        // TODO: check for existing peer, update instead of replace
        this.peers[origin.name] = {
          id: message.actor.id,
          name: origin.name,
          address: origin.name,
          connections: [ origin.name ]
        };

        // Emit peer event
        this.emit('peer', this.peers[origin.name]);

        // Send session open event
        const vector = ['P2P_SESSION_OPEN', JSON.stringify({
          type: 'P2P_SESSION_OPEN',
          object: {
            initiator: message.actor.id,
            counterparty: this.identity.id,
            solution: message.object.challenge
          }
        })];

        const PACKET_SESSION_START = Message.fromVector(vector)._setSigner(this.signer).sign();
        const reply = PACKET_SESSION_START.toBuffer();
        if (this.settings.debug) this.emit('debug', `session_start ${PACKET_SESSION_START} ${reply.toString('hex')}`);
        this.connections[origin.name]._writeFabric(reply, socket);
        break;
      case 'P2P_SESSION_OPEN':
        if (this.settings.debug) this.emit('debug', `Handling session open: ${JSON.stringify(message.object)}`);
        this.peers[origin.name] = { id: message.object.counterparty, name: origin.name, address: origin };
        this.emit('peer', this.peers[origin.name]);
        break;
      case 'P2P_CHAT_MESSAGE':
        this.emit('chat', message);
        const relay = Message.fromVector(['ChatMessage', JSON.stringify(message)])._setSigner(this.signer);
        // this.emit('debug', `Relayed chat message: ${JSON.stringify(relay.toGenericMessage())}`);
        this.relayFrom(origin.name, relay);
        break;
      case 'P2P_STATE_ANNOUNCE':
        const state = new Actor(message.object.state);
        this.emit('debug', `state_announce <Generic>${JSON.stringify(message.object || '')} ${state.toGenericMessage()}`);
        break;
      case 'P2P_PING':
        const now = (new Date()).toISOString();
        const P2P_PONG = Message.fromVector(['GENERIC', JSON.stringify({
          actor: {
            id: this.identity.id
          },
          created: now,
          type: 'P2P_PONG',
          object: {
            created: now
          }
        })]);

        this.connections[origin.name]._writeFabric(P2P_PONG.toBuffer());
        break;
      case 'P2P_PONG':
        // Update the peer's score for succesfully responding to a ping
        // TODO: ensure no pong is handled when a ping was not previously sent
        const instance = this.state.actors[actor.id] ? this.state.actors[actor.id] : {};

        this.actors[actor.id].adopt([
          { op: 'replace', path: '/score', value: (instance.score || 0) + 1 }
        ]);

        this._state.content.actors[actor.id] = this.actors[actor.id].state;
        this.commit();

        this.emit('state', this.state);
        break;
      case 'P2P_PEER_ALIAS':
        this.emit('debug', `peer_alias ${origin.name} <Generic>${JSON.stringify(message.object || '')}`);
        this.connections[origin.name]._alias = message.object.name;
        break;
      case 'P2P_PEER_ANNOUNCE':
        this.emit('debug', `peer_announce <Generic>${JSON.stringify(message.object || '')}`);
        const candidate = new Actor(message.object);
        this.candidates.push(candidate.toGenericMessage());
        // this._fillPeerSlots();

        // const announce = Message.fromVector(['PeerAnnounce', JSON.stringify(message)]);
        // this.relayFrom(origin.name, announce);
        break;
      case 'P2P_DOCUMENT_PUBLISH':
        break;
      case 'CONTRACT_PUBLISH':
        // TODO: reject and punish mis-behaving peers
        this.emit('debug', `Handling peer contract publish: ${JSON.stringify(message.object)}`);
        this._registerContract(message.object);
        break;
      case 'CONTRACT_MESSAGE':
        // TODO: reject and punish mis-behaving peers
        if (this.settings.debug) this.emit('debug', `Handling contract message: ${JSON.stringify(message.object)}`);
        if (this.settings.debug) this.emit('debug', `Contract state: ${JSON.stringify(this.state.contracts[message.object.contract])}`);
        manager.applyPatch(this._state.content.contracts[message.object.contract], message.object.ops);
        this.commit();
        break;
    }
  }

  _handleNOISEHandshake (localPrivateKey, localPublicKey, remotePublicKey) {
    this.emit('debug', `Peer transport handshake using local key: ${localPrivateKey.toString('hex')}`);
    this.emit('debug', `Peer transport handshake using local public key: ${localPublicKey.toString('hex')}`);
    this.emit('debug', `Peer transport handshake with remote public key: ${remotePublicKey.toString('hex')}`);
  }

  _NOISESocketHandler (socket) {
    const target = `${socket.remoteAddress}:${socket.remotePort}`;
    const url = `tcp://${target}`;

    // Store a unique actor for this inbound connection
    this._registerActor({ name: target });

    // Create NOISE handler
    const handler = noise({
      prologue: Buffer.from(PROLOGUE),
      // privateKey: this.identity.key.private,
      verify: this._verifyNOISE.bind(this)
    });

    // Set up NOISE event handlers
    handler.encrypt.on('handshake', this._handleNOISEHandshake.bind(this));
    handler.encrypt.on('error', (error) => {
      this.emit('error', `NOISE encrypt error: ${error}`);
    });

    handler.encrypt.on('end', (data) => {
      this.emit('debug', `Peer encrypt end: ${data}`);
    });

    handler.decrypt.on('error', (error) => {
      this.emit('error', `NOISE decrypt error: ${error}`);
    });

    handler.decrypt.on('close', (data) => {
      this.emit('debug', `Peer decrypt close: ${data}`);
    });

    handler.decrypt.on('end', (data) => {
      this.emit('debug', `Peer decrypt end: ${data}`);
    });

    handler.decrypt.on('data', (data) => {
      this._handleFabricMessage(data, { name: target });
    });

    socket._destroyFabric = () => {
      this._destroyFabric(socket, target);
    };

    socket._writeFabric = (msg) => {
      this._writeFabric(msg, handler);
    };

    // Store socket in collection
    this.connections[target] = socket;

    // Begin NOISE stream
    handler.encrypt.pipe(socket).pipe(handler.decrypt);

    this.emit('connections:open', {
      id: target,
      url: url
    });
  }

  _publishDocument (hash, rate = 0) {
    this._state.content.documents[hash] = document;

    this.commit();

    const PACKET_DOCUMENT_PUBLISH = Message.fromVector(['P2P_DOCUMENT_PUBLISH', JSON.stringify({
      type: 'P2P_DOCUMENT_PUBLISH',
      object: {
        hash: hash,
        rate: rate
      }
    })]);

    const message = PACKET_DOCUMENT_PUBLISH.toBuffer();
    if (this.settings.debug) this.emit('debug', `Broadcasting document publish: ${message.toString('utf8')}`);
    this.broadcast(message);
  }

  _registerActor (object) {
    this.emit('debug', `Registering actor: ${JSON.stringify(object, null, '  ')}`);
    const actor = new Actor(object);

    if (this.actors[actor.id]) return this;

    this.actors[actor.id] = actor;
    this.commit();
    this.emit('actorset', this.actors);

    return this;
  }

  _registerContract (object) {
    this.emit('debug', `Registering contract: ${JSON.stringify(object, null, '  ')}`);
    const actor = new Actor(object);

    if (this.contracts[actor.id]) return this;

    this.contracts[actor.id] = actor;
    this._state.content.contracts[actor.id] = object.state;

    this.commit();
    this.emit('contractset', this.contracts);

    return this;
  }

  _registerNOISEClient (name, socket, client) {
    // Assign socket properties
    // Failure counter
    socket._failureCount = 0;
    socket._lastMessage = null;
    socket._messageLog = [];

    // Enable keepalive
    socket._keepalive = setInterval(() => {
      const now = (new Date()).toISOString();
      const P2P_PING = Message.fromVector(['GENERIC', JSON.stringify({
        actor: {
          id: this.identity.id
        },
        created: now,
        type: 'P2P_PING',
        object: {
          created: now
        }
      })]);

      try {
        client.encrypt.write(P2P_PING.toBuffer());
      } catch (exception) {
        this.emit('debug', `Cannot write ping: ${exception}`)
      }
    }, 60000);

    // TODO: reconcile APIs for these methods
    // Map destroy function
    socket._destroyFabric = () => {
      this._destroyFabric(socket, name);
    };

    // Map write function
    socket._writeFabric = (msg) => {
      this._writeFabric(msg, client);
    };

    this.connections[name] = socket;

    return this;
  }

  _registerPeer (data) {
    const peer = new Actor({
      type: 'Peer',
      data: data
    });

    if (this.peers[peer.id]) return this;

    this.peers[peer.id] = peer;

    return this;
  }

  _scheduleReconnect (target, when = 250) {
    this.emit('debug', `Scheduled reconnect to ${target} in ${when} milliseconds...`);
    const reconnect = setTimeout(() => {
      this._connect(target);
    }, when);
  }

  _verifyNOISE (localPrivateKey, localPublicKey, remotePublicKey, done) {
    // Is the message valid?
    if (1 === 1) {
      done(null, true);
    } else {
      done(null, false);
    }
  }

  _writeFabric (msg, stream) {
    const hash = crypto.createHash('sha256').update(msg).digest('hex');
    this.messages[hash] = msg.toString('hex');
    this.commit();
    if (stream) stream.encrypt.write(msg);
  }

  /**
   * Start the Peer.
   */
  async start () {
    let address = null;
    this.emit('log', 'Peer starting...');

    // Register self
    this._registerActor({ name: `${this.interface}:${this.port}` });

    if (this.settings.listen) {
      this.emit('log', 'Listener starting...');

      try {
        address = await this.listen();
        this.emit('log', 'Listener started!');
      } catch (exception) {
        this.emit('error', 'Could not listen:', exception);
      }
    }

    if (this.settings.networking) {
      this.emit('warning', `Networking enabled.  Connecting to peers: ${JSON.stringify(this.settings.peers)}`);
      for (const candidate of this.settings.peers) {
        this._connect(candidate);
      }
    }

    if (this.settings.debug) this.emit('debug', `Observing state...`);

    try {
      this.observer = manager.observe(this._state.content);
    } catch (exception) {
      this.emit('error', `Could not observe state: ${exception}`);
    }

    await this._startHeart();

    if (this.settings.debug) this.emit('debug', `Peer ready!  State: ${JSON.stringify(this.state, null, '  ')}`);

    this.emit('ready', {
      id: this.id,
      address: address,
      pubkey: this.key.pubkey
    });

    if (this.settings.debug) this.emit('debug', `Peer started!`);

    /*
    const PACKET_PEER_ANNOUNCE = Message.fromVector(['P2P_PEER_ANNOUNCE', JSON.stringify({
      type: 'P2P_PEER_ANNOUNCE',
      object: {
        host: this._externalIP,
        port: this.settings.port
      }
    })])._setSigner(this.signer).sign();
    const announcement = PACKET_PEER_ANNOUNCE.toBuffer();
    // this.emit('debug', `Announcing peer: ${announcement.toString('utf8')}`);
    this.connections[origin.name]._writeFabric(announcement, socket);
    */

    return this;
  }

  /**
   * Stop the peer.
   */
  async stop () {
    // Alert listeners
    this.emit('log', 'Peer stopping...');
    this._state.status = 'STOPPING';

    // Stop the heart
    if (this._heart) clearInterval(this._heart);

    this.emit('debug', 'Closing all connections...');
    for (const id in this.connections) {
      this.connections[id].destroy();
    }

    const terminator = async () => {
      return new Promise((resolve, reject) => {
        if (!this.server.address()) return resolve();
        return this.server.close(function serverClosed (error) {
          if (error) return reject(error);
          resolve();
        });
      });
    }

    this.emit('debug', 'Closing network...');
    await terminator();

    this._state.status = 'STOPPED';
    this.commit();

    this.emit('log', 'Peer stopped!');

    return this;
  }

  async _setState (value) {
    if (!value) return new Error('You must provide a State to set the value to.');
    this._state.content = value;
    return this.state;
  }

  _disconnect (address) {
    if (!this.connections[address]) return false;

    // Halt any heartbeat
    if (this.connections[address].heartbeat) {
      clearInterval(this.connections[address].heartbeat);
    }

    // Destroy the connection
    // this.connections[address].destroy();

    // Remove connection from map
    delete this.connections[address];
  }

  _maintainConnection (address) {
    const peer = this;
    if (!peer.connections[address]) return new Error(`Connection for address "${address}" does not exist.`);
    /* peer.connections[address]._player = setInterval(function () {
      peer._pingConnection.apply(peer, [ address ]);
    }, 60000); */
  }

  _pingConnection (address) {
    const ping = Message.fromVector(['Ping', `${Date.now().toString()}`]);

    try {
      this.sendToSocket(address, ping);
    } catch (exception) {
      this.emit('error', `Couldn't deliver message to socket: ${exception}`);
    }
  }

  _updateLiveness (address) {
    // Return Error if no connection
    if (!this.connections[address]) {
      const error = `No connection for address: ${address}`;
      this.emit('error', error);
      return new Error(error);
    }

    // Set the _lastMessage property
    this.connections[address]._lastMessage = Date.now();

    // Make chainable
    return this;
  }

  _registerHandler (type, method) {
    if (this.handlers[type]) return new Error(`Handler for method "${type}" is already registered.`);
    this.handlers[type] = method.bind(this);
    return this.handlers[type];
  }

  async _requestStateFromAllPeers () {
    const message = Message.fromVector(['StateRequest']);
    this.broadcast(message);
  }

  /**
   * Start listening for connections.
   * @return {Peer} Chainable method.
   */
  async listen () {
    return new Promise((resolve, reject) => {
      this.server.listen(this.port, this.interface, (error) => {
        if (error) return reject(error);

        const details = this.server.address();
        const address = `${details.address}:${details.port}`;

        this.emit('log', `Now listening on tcp://${address} [!!!]`);
        return resolve(address);
      });

      this.server.on('error', (error) => {
        this.emit('error', `Server socket error: ${error}`);
      });
    });
  }
}

module.exports = Peer;