Source: types/reader.js

'use strict';

const {
  HEADER_SIZE,
  MAGIC_BYTES,
  MAX_MESSAGE_SIZE
} = require('../constants');

const merge = require('lodash.merge');
const EventEmitter = require('events').EventEmitter;
const Message = require('./message');

/**
 * Read from a byte stream, seeking valid Fabric messages.
 */
class Reader extends EventEmitter {
  /**
   * Create an instance of a {@link Reader}, which can listen to a byte stream
   * for valid Fabric messages.
   * @param {Object} settings Settings for the stream.
   * @returns {Reader}
   */
  constructor (settings = {}) {
    super(settings);

    this.settings = merge({
      constraints: {
        frame: {
          size: MAX_MESSAGE_SIZE
        }
      }
    }, settings);

    this.queue = [];
    this.frame = Buffer.alloc(MAX_MESSAGE_SIZE);

    return this;
  }

  get _bufferedBytes () {
    return this.queue.length;
  }

  _addData (data) {
    for (let i = 0; i < data.length; i++) {
      this.queue.push(data[i]);
    }

    this._readFabricFrame();
  }

  _addBytes (data) {
    for (let i = 0; i < data.length; i++) {
      this.queue.push(data[i]);
    }
  }

  _readBytes (count) {
    const bytes = [];

    for (let i = 0; i < count; i++) {
      bytes.push(this.queue[i]);
    }

    return bytes;
  }

  _takeBytes (count) {
    const bytes = [];

    for (let i = 0; i < count; i++) {
      bytes.push(this.queue.shift());
    }

    return bytes;
  }

  _promiseBytes (count = 1) {
    const self = this;
    return new Promise((resolve, reject) => {
      const bytes = self._readBytes(count);
      return resolve(bytes);
    });
  }

  _readFrame (size = 1) {
    const bytes = this._readBytes(size);
    this.emit('candidate', bytes);
    return bytes;
  }

  _readFabricFrame () {
    // Ensure we have at least a full message header
    if (this._bufferedBytes < HEADER_SIZE) return;

    // Read up to HEADER_SIZE bytes
    const header = this._readBytes(HEADER_SIZE);
    const parts = [];

    // Segment the header bytes
    parts.push(header.slice(0, 4)); // magic
    parts.push(header.slice(4, 8)); // version
    parts.push(header.slice(8, 40)); // parent
    parts.push(header.slice(40, 72)); // author
    parts.push(header.slice(72, 76)); // type
    parts.push(header.slice(76, 80)); // payload size
    parts.push(header.slice(80, 112)); // hash
    parts.push(header.slice(112, HEADER_SIZE)); // signature

    const map = parts.map((x) => Buffer.from(x, 'hex'));
    const elements = map.map((x) => parseInt(x.toString('hex'), 16));

    // Read header
    const magic = elements[0];
    const version = elements[1];
    const parent = elements[2];
    const author = elements[3];
    const type = elements[4];
    const size = elements[5];
    const signature = elements[6];
    const hash = elements[7];

    if (magic !== MAGIC_BYTES) {
      throw new Error(`Header not magic: ${magic} !== ${MAGIC_BYTES}`);
    }

    // Defer to next call (not enough data)
    if (this._bufferedBytes < HEADER_SIZE + size) return;

    // Take extra data
    const data = this._takeBytes(HEADER_SIZE + size);
    const frame = Buffer.from(data, 'hex');

    // Provide data for debugger
    const proposal = {
      magic,
      version,
      parent,
      author,
      type,
      size,
      hash,
      signature,
      data
    };

    // this.emit('debug', `Reader Proposal: ${JSON.stringify(proposal, null, '  ')}`);
    this.emit('message', frame);
  }
}

module.exports = Reader;