plugins/queue.js

'use strict';

/**
 * @class
 * @description Optional queue plugin. Queues commands sent to transport and sends one at a time
 */

class Queue {
	/**
	 * @constructor
	 * @param {Object} opts General settings for the Queue
	 * @param {String} opts.success The prefix to watch for that indicates the command succeeded
	 * @param {String} opts.error The prefix to watch for that indicates the command failed
	 * @param {Adapter} adapter Adapter for which this is a plugin
	 */
	constructor(opts, adapter) {
		this._opts = opts;
		this._adapter = adapter;
		this._queue = [];
		this._format = this._adapter.opts.format;
	}

	/**
	 * @description Adds a command to the queue. (Also processes next item from queue)
	 * @param {Object|String|undefined} address The address of the device the command belongs to
	 * @param {Command} command The command to send
	 * @param {Object} [scope] Context to use for the onReceived handler
	 * @returns {Promise}
	 */

	add(address, command, scope) {
		const thisQueueInstance = this;
		return new Promise((resolve, reject) => {
			this._queue.push({
				onReceived: function (error, response) {
					if (error) {
						reject({ error: new Error(error), data: response });
					} else {
						resolve(response);
					}
				},
				onProcessed: function () {
					// NOTE:context here is this instance of Queue
					thisQueueInstance._adapter.bridge.send(address, command)
						.catch(e => reject(e));
				},
				address: address,
				command: command,
				scope: scope
			});

			this.process();
		});
	}

	/**
	 * @description Process received data from transport
	 * @param {Buffer} data The data received
	 * @private
	 * @returns {Boolean} True if queue handled the data
	 */

	received(data) {
		let handled = false;

		this._adapter.bridge.k4.logging.debug('Queue received', data.toString(this._format));

		// if the data contains the success string, call onReceived callback
		if (data.indexOf(this._opts.success) > -1) {
			this._adapter.bridge.k4.logging.debug('Queue data matched ', this._opts.success);
			const reply = (this._data.length > 0) ? this._data : null;
			this._current.onReceived.apply(this._current.scope, [null, reply]);
			handled = true;
		}

		// if the data contains the optional error string, call onReceived
		// callback
		if (this._opts.error && data.indexOf(this._opts.error) > -1) {
			this._adapter.bridge.k4.logging.debug('Queue data matched ', this._opts.error);
			const reply = (this._data.length > 0) ? this._data : null;
			this._current.onReceived.apply(this._current.scope, [data, reply]);
			handled = true;
		}

		if (handled) {
			// either success or error handler matched, cleanup current and process next if available

			this._adapter.bridge.k4.logging.debug('Queue handled', data.toString(this._format));
			this._current = null;

			setImmediate(() => {
				this.process();
			});

			return true;
		}

		if (this._current) {
			// we are waiting, but success or error hasnt matched yet - add data to data array
			this._data.push(data);
			return true;
		}
	}

	/**
	 * @description Send next item from queue
	 * @private
	 */

	process() {
		this._adapter.bridge.k4.logging.debug('Queue process. Length: ', this._queue.length);

		if (this._queue.length === 0) {
			return;
		}

		if (this._current) {
			return;
		}

		this._current = this._queue.shift();
		this._data = [];

		// NOTE: Perform tasks for this enqueued item that should be
		// executed at the point where the item is processed, which is here.
		// Currently, the task of the onProcessed handler registered for
		// this enqueued item is to relay a command to be sent by the bridge.
		this._current.onProcessed.apply(this);
	}
};

module.exports = Queue;