'use strict';
// Dependencies
const zeromq = require('zeromq/v5-compat');
// Fabric Types
const Service = require('../types/service');
const Message = require('../types/message');
/**
* Connect and subscribe to ZeroMQ publishers.
*/
class ZMQ extends Service {
/**
* Creates an instance of a ZeroMQ subscriber.
* @param {Object} [settings] Settings for the ZMQ connection.
* @param {String} [settings.host] Host for the ZMQ publisher.
* @param {Number} [settings.port] Remote ZeroMQ service port.
* @returns {ZMQ} Instance of the ZMQ service, ready to run `start()`
*/
constructor (settings = {}) {
super(settings);
// Assign settings over the defaults
// NOTE: switch to lodash.merge if clobbering defaults
this.settings = Object.assign({
host: 'localhost',
port: 29000,
subscriptions: [
'hashblock',
'rawblock',
'sequence'
]
}, settings);
this.socket = null;
this._state = { status: 'STOPPED' };
return this;
}
/**
* Opens the connection and subscribes to the requested channels.
* @returns {ZMQ} Instance of the service.
*/
async start () {
const self = this;
this.socket = zeromq.socket('sub');
this.socket.connect(`tcp://${this.settings.host}:${this.settings.port}`);
this.socket.on('message', function _handleSocketMessage (topic, message) {
const path = `channels/${topic.toString()}`;
self.emit('debug', `ZMQ message @ [${path}] (${message.length} bytes) ⇒ ${message.toString('hex')}`);
self.emit('message', Message.fromVector(['Generic', {
topic: topic.toString(),
message: message.toString('hex'),
encoding: 'hex'
}]).toObject());
});
for (let i = 0; i < this.settings.subscriptions.length; i++) {
this.subscribe(this.settings.subscriptions[i]);
}
this.status = 'STARTED';
this.emit('ready');
return this;
}
/**
* Closes the connection to the ZMQ publisher.
* @returns {ZMQ} Instance of the service.
*/
async stop () {
this.status = 'STOPPING';
this.socket.close();
this.status = 'STOPPED';
return this;
}
subscribe (name) {
this.socket.subscribe(name);
}
}
module.exports = ZMQ;