'use strict';
/**
* @class
* @description Optional queue plugin. Queues commands sent to transport and sends one at a time
*/
class Queue {
/**
* @constructor
* @param {Object} opts General settings for the Queue
* @param {String} opts.success The prefix to watch for that indicates the command succeeded
* @param {String} opts.error The prefix to watch for that indicates the command failed
* @param {Adapter} adapter Adapter for which this is a plugin
*/
constructor(opts, adapter) {
this._opts = opts;
this._adapter = adapter;
this._queue = [];
this._format = this._adapter.opts.format;
}
/**
* @description Adds a command to the queue. (Also processes next item from queue)
* @param {Object|String|undefined} address The address of the device the command belongs to
* @param {Command} command The command to send
* @param {Object} [scope] Context to use for the onReceived handler
* @returns {Promise}
*/
add(address, command, scope) {
const thisQueueInstance = this;
return new Promise((resolve, reject) => {
this._queue.push({
onReceived: function (error, response) {
if (error) {
reject({ error: new Error(error), data: response });
} else {
resolve(response);
}
},
onProcessed: function () {
// NOTE:context here is this instance of Queue
thisQueueInstance._adapter.bridge.send(address, command)
.catch(e => reject(e));
},
address: address,
command: command,
scope: scope
});
this.process();
});
}
/**
* @description Process received data from transport
* @param {Buffer} data The data received
* @private
* @returns {Boolean} True if queue handled the data
*/
received(data) {
let handled = false;
this._adapter.bridge.k4.logging.debug('Queue received', data.toString(this._format));
// if the data contains the success string, call onReceived callback
if (data.indexOf(this._opts.success) > -1) {
this._adapter.bridge.k4.logging.debug('Queue data matched ', this._opts.success);
const reply = (this._data.length > 0) ? this._data : null;
this._current.onReceived.apply(this._current.scope, [null, reply]);
handled = true;
}
// if the data contains the optional error string, call onReceived
// callback
if (this._opts.error && data.indexOf(this._opts.error) > -1) {
this._adapter.bridge.k4.logging.debug('Queue data matched ', this._opts.error);
const reply = (this._data.length > 0) ? this._data : null;
this._current.onReceived.apply(this._current.scope, [data, reply]);
handled = true;
}
if (handled) {
// either success or error handler matched, cleanup current and process next if available
this._adapter.bridge.k4.logging.debug('Queue handled', data.toString(this._format));
this._current = null;
setImmediate(() => {
this.process();
});
return true;
}
if (this._current) {
// we are waiting, but success or error hasnt matched yet - add data to data array
this._data.push(data);
return true;
}
}
/**
* @description Send next item from queue
* @private
*/
process() {
this._adapter.bridge.k4.logging.debug('Queue process. Length: ', this._queue.length);
if (this._queue.length === 0) {
return;
}
if (this._current) {
return;
}
this._current = this._queue.shift();
this._data = [];
// NOTE: Perform tasks for this enqueued item that should be
// executed at the point where the item is processed, which is here.
// Currently, the task of the onProcessed handler registered for
// this enqueued item is to relay a command to be sent by the bridge.
this._current.onProcessed.apply(this);
}
};
module.exports = Queue;