Source: services/redis.js

'use strict';

const redis = require('redis');
const Service = require('../types/service');
const Message = require('../types/message');

/**
 * Connect and subscribe to ZeroMQ servers.
 */
class Redis extends Service {
  /**
   * Creates an instance of a ZeroMQ subscriber.
   * @param {Object} [settings] Settings for the Redis connection.
   * @param {String} [settings.host] Host for the Redis server.
   * @param {Number} [settings.port] Remote ZeroMQ service port.
   * @returns {Redis} Instance of the Redis service, ready to run `start()`
   */
  constructor (settings = {}) {
    super(settings);

    // Assign settings over the defaults
    // NOTE: switch to lodash.merge if clobbering defaults
    this.settings = Object.assign({
      host: 'localhost',
      port: 6379,
      subscriptions: []
    }, settings);

    this.socket = null;
    this._state = { status: 'STOPPED' };

    return this;
  }

  /**
   * Opens the connection and subscribes to the requested channels.
   * @returns {Redis} Instance of the service.
   */
  async start () {
    const self = this;

    this.socket = redis.createClient(this.settings);
    this.socket.on('error', function _handleSocketError (error) {
      self.emit('error', `Redis socket error: ${error}`);
    });

    this.socket.on('message', function _handleSocketMessage (topic, message) {
      const path = `channels/${topic.toString()}`;
      self.emit('debug', `Redis message @ [${path}] (${message.length} bytes) ⇒ ${message.toString('hex')}`);
      self.emit('message', Message.fromVector(['Generic', {
        topic: topic.toString(),
        message: message.toString('hex'),
        encoding: 'hex'
      }]).toObject());
    });

    for (let i = 0; i < this.settings.subscriptions.length; i++) {
      this.subscribe(this.settings.subscriptions[i]);
    }

    this.status = 'STARTED';
    this.emit('ready', {
      id: this.id
    });

    return this;
  }

  /**
   * Closes the connection to the Redis server.
   * @returns {Redis} Instance of the service.
   */
  async stop () {
    this.status = 'STOPPING';
    this.socket.close();
    this.status = 'STOPPED';
    return this;
  }

  subscribe (name) {
    this.socket.subscribe(name);
  }
}

module.exports = Redis;