'use strict';
const net = require('net');
const crypto = require('crypto');
const stream = require('stream');
const Key = require('./key');
const Machine = require('./machine');
const Message = require('./message');
const Scribe = require('./scribe');
// TODO: implement the noise protocol: http://noiseprotocol.org/noise.html
const P2P_IDENT_REQUEST = 0x01; // 1, or the identity
const P2P_IDENT_RESPONSE = 0x11;
const P2P_ROOT = 0x00000000;
const P2P_PING = 0x00000012; // same ID as Lightning (18)
const P2P_PONG = 0x00000013; // same ID as Lightning (19)
const P2P_INSTRUCTION = 0x00000020; // TODO: select w/ no overlap
const P2P_BASE_MESSAGE = 0x00000031; // TODO: select w/ no overlap
const P2P_STATE_COMMITTMENT = 0x00000032; // TODO: select w/ no overlap
const P2P_STATE_CHANGE = 0x00000033; // TODO: select w/ no overlap
const ZERO_LENGTH_PLAINTEXT = '';
/**
* An in-memory representation of a node in our network.
*/
class Peer extends Scribe {
/**
* Create an instance of {@link Peer}.
* @param {Vector} config - Initialization Vector for this peer.
*/
constructor (config) {
super(config);
this.name = 'Peer';
this.config = Object.assign({
address: '0.0.0.0',
port: 7777
}, config || {});
this.stream = new stream.Transform({
transform (chunk, encoding, callback) {
// TODO: parse as encrypted data
callback(null, chunk);
}
});
// TODO: attempt to use handler binding
// probably bug with this vs. self
// this.stream.on('data', this._handler.bind(this));
this.key = this.config.key || new Key();
this.hex = this.key.public.encodeCompressed('hex');
this.pkh = crypto.createHash('sha256').update(this.hex).digest('hex');
this.address = this.config.address;
this.port = this.config.port;
this.connections = {};
this.peers = {};
this.memory = {};
this.messages = new Set();
this.machine = new Machine();
return this;
}
start () {
this.log('Peer starting...');
if (!this.server) {
this.listen();
}
}
_connect (address) {
let self = this;
let parts = address.split(':');
let known = Object.keys(self.connections);
if (parts.length !== 2) return self.debug('Invalid address:', address);
if (known.includes(address)) return self.connections[address];
// TODO: refactor to use local functions + specific unbindings
try {
self.connections[address] = new net.Socket();
self.connections[address].on('error', function (err) {
self.debug('[PEER]', `could not connect to peer ${address} — Reason:`, err);
});
self.connections[address].on('close', function (err) {
if (err) self.debug('socket closed on error:', err);
self.connections[address].removeAllListeners();
// TODO: consider using `process.nextTick` to only clean up after event?
delete self.connections[address];
self.emit('connections:close', {
address: address
});
});
// TODO: unify as _dataHandler
self.connections[address].on('data', async function (data) {
let message = self._parseMessage(data);
// disconnect from any peer sending invalid messages
if (!message) this.destroy();
let response = await self._handleMessage({
origin: address,
message: message
});
if (response) {
this.write(response.asRaw());
}
});
// TODO: replace with handshake
self.connections[address].connect(parts[1], parts[0], function () {
self.emit('connections:open', {
address: address,
status: 'unauthenticated',
initiator: true
});
self.log(`connection to ${address} established!`);
// TODO: check peer ID, eject if self or known
let message = Message.fromVector([P2P_IDENT_REQUEST, self.id]);
self.connections[address].write(message.asRaw());
});
} catch (E) {
self.log('[PEER]', 'failed to connect:', E);
}
return self.connections[address];
}
_parseMessage (data) {
if (!data) return false;
let self = this;
let message = null;
try {
message = Message.fromRaw(data);
} catch (E) {
self.debug('[PEER]', 'error parsing message:', E);
}
return message;
}
_handleConnection (socket) {
let self = this;
let address = [socket.remoteAddress, socket.remotePort].join(':');
self.log('incoming connection:', address);
self.emit('connections:open', {
address: address,
status: 'connected',
initiator: false
});
socket.on('close', function terminate () {
self.log('connection closed:', address);
self.emit('connections:close', { address: address });
});
// TODO: unify as _dataHandler
socket.on('data', async function (data) {
let message = self._parseMessage(data);
// disconnect from any peer sending invalid messages
if (!message) this.destroy();
let response = await self._handleMessage({
origin: address,
message: message
});
if (response) {
this.write(response.asRaw());
}
});
// add this socket to the list of known connections
this.connections[address] = socket;
}
_registerPeer (peer) {
let self = this;
if (!peer) return false;
if (!peer.id) {
self.log(`Peer attribute 'id' is required.`);
return false;
}
self.peers[peer.id] = peer;
self.emit('peer', peer);
return true;
}
async _handleMessage (packet) {
if (!packet) return false;
let self = this;
let response = null;
let message = packet.message;
if (!message) return self.error('Hard failure:', packet);
if (self.messages.has(message.id)) {
self.log('received duplicate message:', message);
return false;
} else {
self.memory[message.id] = message;
self.messages.add(message.id);
}
switch (message.type) {
default:
self.log('[PEER]', `unhandled message type "${message.type}"`);
break;
case P2P_IDENT_REQUEST:
self.log('message was an identity request. sending node id...');
response = Message.fromVector([P2P_IDENT_RESPONSE, self.id]);
break;
case P2P_IDENT_RESPONSE:
self.log('message was an identity response! registering peer:', message.data);
if (!self.peers[message.data]) {
let peer = {
id: message.data,
address: packet.origin
};
self._registerPeer(peer);
}
break;
case P2P_BASE_MESSAGE:
self._handleBasePacket(packet);
break;
case P2P_ROOT:
response = Message.fromVector([P2P_STATE_COMMITTMENT, self.state]);
self.log('type was ROOT, sending state root:', response);
self.log('type was ROOT, state was:', self.state);
break;
case P2P_PING:
response = Message.fromVector([P2P_PONG, message.id]);
self.log(`type was PING (${message.id}), sending PONG:`, response);
break;
case P2P_INSTRUCTION:
// TODO: use Fabric.Script / Fabric.Machine
let stack = message.data.split(' ');
switch (stack[1]) {
case 'SIGN':
let signature = self.key._sign(stack[0]);
let buffer = Buffer.from(signature);
let script = [buffer.toString('hex'), 'CHECKSIG'].join(' ');
response = Message.fromVector([P2P_INSTRUCTION, script]);
break;
default:
self.log('[PEER]', `unhandled instruction "${stack[1]}"`);
break;
}
break;
}
return response;
}
_handleBasePacket (packet) {
let message = null;
try {
message = JSON.parse(packet.message.data);
} catch (E) {
return this.log('Error parsing message:', E);
}
switch (message.type) {
default:
this.log('unhandled base packet type:', message.type);
break;
case 'collections:post':
this.emit('collections:post', message.data);
break;
}
}
broadcast (message) {
if (typeof message !== 'string') message = JSON.stringify(message);
let id = crypto.createHash('sha256').update(message).digest('hex');
if (this.messages.has(id)) {
this.log('attempted to broadcast duplicate message');
return false;
} else {
this.memory[id] = message;
this.messages.add(id);
}
for (let id in this.peers) {
let peer = this.peers[id];
this.log('creating message for:', peer);
// TODO: select type byte for state updates
let msg = Message.fromVector([P2P_BASE_MESSAGE, message]);
this.connections[peer.address].write(msg.asRaw());
}
}
_broadcastTypedMessage (type, message) {
if (!message) message = '';
if (typeof message !== 'string') message = JSON.stringify(message);
let id = crypto.createHash('sha256').update(message).digest('hex');
if (this.messages.has(id)) {
this.log('attempted to broadcast duplicate message');
return false;
} else {
this.memory[id] = message;
this.messages.add(id);
}
for (let id in this.peers) {
let peer = this.peers[id];
this.log('creating message for:', peer);
// TODO: select type byte for state updates
let msg = Message.fromVector([type, message]);
this.connections[peer.address].write(msg.asRaw());
}
}
/**
* Start listening for connections.
* @fires Peer#ready
* @return {Peer} Chainable method.
*/
listen () {
let self = this;
self.server = net.createServer(self._handleConnection.bind(self));
self.server.listen(self.config.port, self.config.address, function () {
if (self.config.debug) {
self.log('[PEER]', `${self.id} now listening on tcp://${self.address}:${self.port}`);
}
self.emit('ready', {
id: self.id
});
});
return self;
}
}
module.exports = Peer;