'use strict';
const Chance = require('chance');
const {default: PromiseQueue} = require('p-queue');
const _isNil = require('lodash/isNil');
const _isEqual = require('lodash/isEqual');
const _spread = require('lodash/spread');
const _has = require('lodash/has');
const crypto = require('crypto');
const BigNumber = require('bignumber.js');
const PromiseSeries = require('p-series');
const events = require('events');
const debug = require('debug')('k4-base-bridge:pollingManager');
const CronTask = require('cron').CronJob;
const momentTimezone = require('moment-timezone');
let TimingUtility;
let MappingChangeMonitor;
let ModelChangeMonitor;
/**
* @class
* @description Plugin to orchestrate polling device variables on
* a certain schedule
*/
/**
* @typedef {Array} Tuple
*/
class PollingManager {
/**
* @constructor
* @param {Object} [opts] General settings for PollingManager
* @param {Adapter} adapter Adapter for which this is a plugin
*/
constructor(opts, adapter) {
this._opts = opts;
this.adapter = adapter;
this._initCloseQueue = new PromiseQueue({
concurrency: 1,
autoStart: true
});
this._shouldReinitializeOnBridgeReady = false;
if (this._opts &&
this._opts.shouldReinitializeOnBridgeReady === true) {
this._shouldReinitializeOnBridgeReady = true;
}
this.emitter = new events.EventEmitter();
this._defaultPollSlot = null;
TimingUtility = this._adapter.bridge.classes.TimingUtility;
MappingChangeMonitor = this._adapter.bridge.classes.MappingChangeMonitor;
ModelChangeMonitor = this._adapter.bridge.classes.ModelChangeMonitor;
this.mappingChangeMonitor = new MappingChangeMonitor(adapter);
this.modelChangeMonitor = new ModelChangeMonitor(adapter);
}
get shouldReinitializeOnBridgeReady() {
return this._shouldReinitializeOnBridgeReady;
}
/**
* @property {Object} constants
* @static
*/
static get constants() {
return {
nodeStates: {
ALIVE: 'alive',
DEAD: 'dead',
UNKNOWN: 'unknown'
},
misc: {
PING_AFTER_DEAD_DEFAULT: false
}
};
}
/**
* @description The number of total poll slots available
* @type {Number}
*/
set pollSlots(pollSlotsVal) {
this._pollSlots = pollSlotsVal;
}
get pollSlots() {
return this._pollSlots;
}
/**
* @description The duration of the total poll window
* @type {Number}
*/
set pollWindow(pollWindowVal) {
this._pollWindow = pollWindowVal;
}
get pollWindow() {
return this._pollWindow;
}
/**
* @description The specific poll slot id/index for this
* @type {Number}
*/
set pollSlot(slotNum) {
this._pollSlot = slotNum;
}
get pollSlot() {
return this._pollSlot;
}
/**
* @description The adapter instance
* @type {Adapter}
*/
set adapter(adapter) {
this._adapter = adapter;
}
get adapter() {
return this._adapter;
}
get AD_HOC_POLL_SLOT() {
return 0;
}
/**
* @description This method should be named as 'init' because Adapter
* instance calls the 'init' methods (if they exist) for all Adapter
* plugins
* @returns {Promise}
*/
init(options) {
debug(`[init] Starting Polling Manager:`, options);
this.havePolledThisWindow = false;
if (!_isNil(options) && !_isNil(options.force) && options.force === true) {
this._initCloseQueue.pause();
this._initCloseQueue.clear();
this._initCloseQueue.start();
}
return this._initCloseQueue.add(() => this.initRoutine());
}
async initRoutine() {
// Initialize poll interval trackers for every polled item on every
// device
await TimingUtility.setImmediatePromise();
await this.closeRoutine();
// Changes to these below values generally call for bridge reloading
// All of these values are necessarily defined.
this._pollSlots = this.adapter.k4.device.child('properties/pollSlots').value();
this._pollWindow = this.adapter.k4.device.child('properties/pollWindow').value();
// Re-initialize poll trackers
this._pollTrackersByDevice = new Map();
await TimingUtility.setTimeoutPromise(3000);
this._defaultPollSlot = this.adapter.k4.device.child('properties/pollSlot').value();
if (this._defaultPollSlot === null) {
const controllerMACAddress = this.adapter.k4.device.parent().parent().child('properties/mac').value();
this.pollSlot = this.calculatePollSlot(1, this.pollSlots, controllerMACAddress);
} else {
this.pollSlot = this._defaultPollSlot;
}
// Accounting for additional ad-hoc slot
this._pollSlotDuration = this.pollWindow / (this.pollSlots + 1);
Object.keys(this.adapter.mapping)
.forEach((deviceCls) => {
const deviceMapping = this.adapter.mapping[deviceCls];
// Only track devices that have poll settings specified
// in the documentation.
if (deviceMapping.polling) {
const deviceToPoll = Object.keys(this.adapter.bridge.devices)
.map(path => this.adapter.bridge.devices[path], this)
.find(device => (!_isNil(device.model) && device.model.cls() === deviceCls), this);
if (deviceToPoll) {
this.setupPollTrackerForDevice(deviceToPoll, this.pollWindow);
}
}
}, this);
await TimingUtility.setTimeoutPromise(100);
await new Promise((resolve, reject) => {
this.adapter.k4.device.child('properties/pollSlot').set(this.pollSlot, (err) => {
if (err) {
reject(new Error(err));
return;
}
resolve();
});
});
// Setup listeners for Polling Manager setting changes
this._optsListener = this.onPollingOptsChange.bind(this);
this.adapter.k4.device.child('properties/pollSlots').on('value', this._optsListener, this);
this.adapter.k4.device.child('properties/pollWindow').on('value', this._optsListener, this);
this.adapter.k4.device.child('properties/pollSlot').on('value', this._optsListener, this);
await TimingUtility.setTimeoutPromise(20);
// Start ad-hoc task queue
if (!_isNil(this._adHocTasksQueue)) {
throw new Error('Previous ad-hoc task queue was never removed');
}
this._adHocTasksQueue = new PromiseQueue({
concurrency: 1,
autoStart: false
});
if (_isNil(this._adHocTasksQueue)) {
throw new Error('Ad-hoc task queue was never created');
}
// Initiate pollTimer
const pollTimerTuple = this.createTimer(this.onPollSlot, this.pollWindow, this.pollSlots + 1);
if (!_isNil(this._pollTimer)) {
throw new Error('Previous poll tick timer was never removed');
}
this._pollTimer = pollTimerTuple[0];
this._pollTimerCreationTime = pollTimerTuple[1];
if (_isNil(this._pollTimer)) {
throw new Error('Underlying poll tick timer was never created');
}
this.mappingChangeMonitor.init();
this.mappingChangeHandler = this.onMappingChanged.bind(this);
this.mappingChangeMonitor.emitter.on(MappingChangeMonitor.constants.MAPPING_CHANGED_EVENT_NAME, this.mappingChangeHandler);
this.modelChangeMonitor.init();
this.modelChangeHandler = this.onModelChanged.bind(this);
this.modelChangeMonitor.emitter.on(ModelChangeMonitor.constants.MODEL_NODES_CHANGED_EVENT_NAME, this.modelChangeHandler);
return pollTimerTuple;
}
onPollingOptsChange(value, event) {
if (!_isNil(this._optsListener)) {
this.adapter.k4.device.child('properties/pollSlots').off('value', this._optsListener);
this.adapter.k4.device.child('properties/pollWindow').off('value', this._optsListener);
this.adapter.k4.device.child('properties/pollSlot').off('value', this._optsListener);
}
this.init()
.catch((e) => {
debug(`[onPollingOptsChange] Encountered error re-initializing polling manager on opts change.`, e);
this.adapter.k4.logging.error(`Encountered error re-initializing polling manager on opts change. ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
});
}
onMappingChanged(devicesWithChangedMappings) {
devicesWithChangedMappings
.forEach((deviceChangeObj) => {
try {
switch (deviceChangeObj.changeType) {
case MappingChangeMonitor.constants.DEVICE_ADDED: {
debug(`[onMappingChanged] Device mapping was added: ${deviceChangeObj.devicePath}`);
this.adapter.k4.logging.system(`[PollingManager] Device mapping was added: ${deviceChangeObj.devicePath}`);
const device = this.adapter.findBridgeDeviceByNode(deviceChangeObj.devicePath);
// Set up poll tracker for this device
this.setupPollTrackerForDevice(device, this.pollWindow);
if (!_isNil(device) &&
!_isNil(device._mapping)) {
this.emitter.emit('mappingDeviceAddedPing', deviceChangeObj.devicePath, Date.now());
}
} break;
case MappingChangeMonitor.constants.DEVICE_MODIFIED_IN_PLACE: {
debug(`[onMappingChanged] Device mapping was modified in place: ${deviceChangeObj.devicePath}. Fields changed: ${deviceChangeObj.fieldsThatChanged}`);
this.adapter.k4.logging.system(`[PollingManager] Device mapping was modified in place: ${deviceChangeObj.devicePath}. Fields changed: ${deviceChangeObj.fieldsThatChanged}`);
const device = this.adapter.findBridgeDeviceByNode(deviceChangeObj.devicePath);
// Re-initialize poll tracker for this device
this.clearPollTrackerForDevice(deviceChangeObj.devicePath);
this.setupPollTrackerForDevice(device, this.pollWindow);
if (!_isNil(device) &&
!_isNil(device._mapping)) {
this.emitter.emit('mappingDeviceChangedPing', deviceChangeObj.devicePath, Date.now());
}
} break;
case MappingChangeMonitor.constants.DEVICE_REMOVED:
debug(`[onMappingChanged] Device mapping was removed: ${deviceChangeObj.devicePath}`);
this.adapter.k4.logging.system(`[PollingManager] Device mapping was removed: ${deviceChangeObj.devicePath}`);
// Remove poll tracker for this device
this.clearPollTrackerForDevice(deviceChangeObj.devicePath);
break;
default:
break;
}
} catch (e) {
this.adapter.k4.logging.error(`[PollingManger] There was an issue handling an adapter mapping change. ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
}
}, this);
}
onModelChanged(devicesWithChangedNodes) {
devicesWithChangedNodes
.forEach((deviceNodeChangeObj) => {
try {
switch (deviceNodeChangeObj.changeType) {
case ModelChangeMonitor.constants.DEVICE_ADDED: {
debug(`[onModelChanged] Device node was added: ${deviceNodeChangeObj.devicePath}`);
this.adapter.k4.logging.system(`[PollingManager] Device node was added: ${deviceNodeChangeObj.devicePath}`);
const device = this.adapter.findBridgeDeviceByNode(deviceNodeChangeObj.devicePath);
// Set up poll tracker for this device
this.setupPollTrackerForDevice(device, this.pollWindow);
} break;
case ModelChangeMonitor.constants.DEVICE_MODIFIED_IN_PLACE: {
debug(`[onModelChanged] Device node was modified in place: ${deviceNodeChangeObj.devicePath}. Fields changed: ${deviceNodeChangeObj.fieldsThatChanged}`);
this.adapter.k4.logging.system(`[PollingManager] Device node was modified in place: ${deviceNodeChangeObj.devicePath}. Fields changed: ${deviceNodeChangeObj.fieldsThatChanged}`);
const device = this.adapter.findBridgeDeviceByNode(deviceNodeChangeObj.devicePath);
// Re-initialize poll tracker for this device
this.clearPollTrackerForDevice(deviceNodeChangeObj.devicePath);
this.setupPollTrackerForDevice(device, this.pollWindow);
} break;
case ModelChangeMonitor.constants.DEVICE_REMOVED:
debug(`[onModelChanged] Device node was removed: ${deviceNodeChangeObj.devicePath}`);
this.adapter.k4.logging.system(`[PollingManager] Device node was removed: ${deviceNodeChangeObj.devicePath}`);
// Remove poll tracker for this device
this.clearPollTrackerForDevice(deviceNodeChangeObj.devicePath);
break;
default:
break;
}
} catch (e) {
debug(`[onModelChanged] There was an issue handling an device nodes change. `, e);
this.adapter.k4.logging.error(`[PollingManager] There was an issue handling an device nodes change. ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
}
}, this);
}
/**
* @description Initializes poll interval "hourglass" trackers for each
* polled item on a device
* @param {Device} device The device to poll
* @param {Number} pollWindowMsec The duration of the pollWindow (msec)s
*/
setupPollTrackerForDevice(device, pollWindowMsec) {
if (_isNil(this._pollTrackersByDevice)) {
this._pollTrackersByDevice = new Map();
}
const polledItemsTracker = new Map();
if (!_has(device, '_mapping.polling.commands')) {
debug(`[setupPollTrackerForDevice] Mapping poll settings requires a set of commands. Skipping setting up poll tracker for device: ${!_isNil(device) ? device.devicePath : null}`);
this.adapter.k4.logging.warning(`Mapping poll settings requires a set of commands. Skipping setting up poll tracker for device: ${!_isNil(device) ? device.devicePath : null}`);
return;
}
device._mapping.polling.commands
.forEach((pollItem) => {
const pollIntervalMsec = this.obtainPollIntervalForItem(pollItem, device);
if (_isNil(pollIntervalMsec)) {
debug(`[setupPollTrackerForDevice] The specified device ${device.devicePath} does not have a poll interval defined. Skipping poll tracker setup for this device.`);
this.adapter.k4.logging.info(`The specified device ${device.devicePath} does not have a poll interval defined. Skipping poll tracker setup for this device.`);
return;
}
if (pollIntervalMsec < pollWindowMsec) {
debug(`[setupPollTrackerForDevice] pollInterval (${pollIntervalMsec}) must be a natural number multiple of pollWindow (${pollWindowMsec}). Skipping poll tracker setup for device: ${device.devicePath}`);
this.adapter.k4.logging.warning(`pollInterval (${pollIntervalMsec}) must be a natural number multiple of pollWindow (${pollWindowMsec}). Skipping poll tracker setup for device: ${device.devicePath}`);
return;
}
const numCyclesPerPoll = Math.floor(pollIntervalMsec / pollWindowMsec);
let numCyclesForStartupDelay = numCyclesPerPoll;
if (!_isNil(device) &&
!_isNil(device._mapping) &&
!_isNil(device._mapping.polling) &&
!_isNil(device._mapping.polling.maxStartupDelaySec)) {
numCyclesForStartupDelay = Math.floor((device._mapping.polling.maxStartupDelaySec * 1000) / pollWindowMsec);
}
// NOTE: For initial startup delay, will wait at MOST, the specified
// pollInterval for this pollItem. If a specific startup delay
// is specified, that will be used to determine the number of
// cycles to wait before commencing initial poll.
// At minimum, will delay 1 pollWindow (cycle) before initial
// poll.
const rng = new Chance((process.hrtime()[0] * 1e9) + process.hrtime()[1]);
const initialStartupCycles = rng.integer({
min: 1,
max: (numCyclesForStartupDelay < 1) ? 1 : numCyclesForStartupDelay
});
debug(`[setupPollTrackerForDevice] Delaying for ${initialStartupCycles} cycles, the pollItem ${JSON.stringify(pollItem, null, '\t')} for device ${device.devicePath}`);
this.adapter.k4.logging.info(`Delaying for ${initialStartupCycles} cycles, the pollItem ${JSON.stringify(pollItem, null, '\t')} for device ${device.devicePath}`);
polledItemsTracker.set(
JSON.stringify(pollItem, null, '\t'),
initialStartupCycles
);
}, this);
this._pollTrackersByDevice.set(device.devicePath, polledItemsTracker);
}
/**
* @description If the device has removed poll items from its mapping, make
* sure to not actually track, nor by extension, poll for them
* @param {Device} device
*/
prunePollTrackerForDevice(device) {
if (_isNil(this._pollTrackersByDevice)) {
this.adapter.k4.logging.system('No poll trackers available to prune');
return;
}
if (_isNil(device._mapping) || _isNil(device._mapping.polling) || _isNil(device._mapping.polling.commands)) {
this.adapter.k4.logging.system(`[PollingManager] This device ${device.devicePath} does not have any polling mapping. Skipping poll tracker pruning step.`);
return;
}
const actualPollItemsList = device._mapping.polling.commands;
if (this._pollTrackersByDevice.has(device.devicePath)) {
const polledItemsTracker = this._pollTrackersByDevice.get(device.devicePath);
polledItemsTracker
.forEach((v, pollItem) => {
// If mapping no longer has this pollItem, remove it from
// the tracker Map
if (typeof actualPollItemsList.find(elem => _isEqual(pollItem, JSON.stringify(elem, null, '\t'))) === 'undefined') {
this.adapter.k4.logging.system('Poll item being tracked no longer exists in mapping. It shall be removed from the tracker.', JSON.stringify(pollItem, null, '\t'));
polledItemsTracker.delete(pollItem);
}
});
this._pollTrackersByDevice.has(device.devicePath);
}
}
/**
* @description Nulls out poll tracker map for a particular device
* @param {String} devicePath The path to the node of the device to poll
*/
clearPollTrackerForDevice(devicePath) {
if (_isNil(this._pollTrackersByDevice)) {
return;
}
if (this._pollTrackersByDevice.has(devicePath)) {
this._pollTrackersByDevice.set(devicePath, null);
}
}
/**
* @description Determines how many pollWindows are remaining until this
* pollItem should be polled/command issued for
* @param {Device} device The device to poll
* @param {Object} pollItem The command and information associated with the
* item to be polled
* @param {Map} [pollTracker] The poll tracker for this device
* @returns {Number} Returns the number of polls remaining
*/
checkTrackerForPollItem(device, pollItem, pollTracker) {
if (_isNil(pollTracker) && !this._pollTrackersByDevice.has(device.devicePath)) {
throw new Error(`Poll tracker does not exist for this device: ${device.devicePath}`);
}
const polledItemsTracker = pollTracker || this._pollTrackersByDevice.get(device.devicePath);
if (!polledItemsTracker.has(JSON.stringify(pollItem, null, '\t'))) {
throw new Error('Could not check for this pollItem, because it does not exist in the pollTracker');
}
const numPollsRemaining = polledItemsTracker.get(JSON.stringify(pollItem, null, '\t'));
if (Number.isNaN(numPollsRemaining) === true) {
throw new Error(`Poll Decrementor/Hourglass has invalid count: ${numPollsRemaining}`);
}
return numPollsRemaining;
}
/**
* @description Should decrement the number of polls until each item should
* be re-polled in an hourglass fashion. Will roll over and reset to
* interval once the counter reaches zero. If the pollItem does not exist,
* then simply add it to the tracker map.
* @param {Device} device The device to poll
* @param {Number} pollWindowMsec The pollWindow (msec)
* @param {Object} pollItem The command and information associated with the item to
* be polled
* @returns {Number} Returns the number of polls remaining prior to
* decrement
*/
updateTrackerForPollItem(device, pollWindowMsec, pollItem) {
if (!this._pollTrackersByDevice.has(device.devicePath)) {
debug(`[updateTrackerForPollItem] Poll tracker for this device ${device.devicePath} does not exist. Will setup`);
this.adapter.k4.logging.system(`Poll tracker for this device ${device.devicePath} does not exist. Will setup`);
this.setupPollTrackerForDevice(device, this.pollWindow);
if (!this._pollTrackersByDevice.has(device.devicePath)) {
throw new Error(`Poll tracker for this device ${device.devicePath} does not exist`);
}
}
const polledItemsTracker = this._pollTrackersByDevice.get(device.devicePath);
let hasItem = false;
polledItemsTracker
.forEach((v, key) => {
if (_isEqual(key, JSON.stringify(pollItem, null, '\t'))) {
hasItem = true;
}
});
if (hasItem === false) {
debug(`[updateTrackerForPollItem] This pollItem does not yet exist in the pollTracker. Will not poll for this.`, JSON.stringify(pollItem, null, '\t'));
this.adapter.k4.logging.system('This pollItem does not yet exist in the pollTracker. Will not poll for this.', JSON.stringify(pollItem, null, '\t'));
// FIXME: Handling is going to be moved out of here and into the mapping and model change handling logic
// this.resetTrackerForPollItem(device, pollWindowMsec, pollItem);
return NaN;
}
const numPollsRemaining = this.checkTrackerForPollItem(device, pollItem, polledItemsTracker);
const pollIntervalMsec = this.obtainPollIntervalForItem(pollItem, device);
if (_isNil(pollIntervalMsec)) {
debug(`[updateTrackerForPollItem] The specified device ${device.devicePath} does not have a poll interval defined. Skipping poll tracker update for this device.`);
this.adapter.k4.logging.info(`The specified device ${device.devicePath} does not have a poll interval defined. Skipping poll tracker update for this device.`);
return;
}
polledItemsTracker.set(
JSON.stringify(pollItem, null, '\t'),
this.decrementNumPollsRemaining(pollIntervalMsec, pollWindowMsec, numPollsRemaining)
);
this._pollTrackersByDevice.set(device.devicePath, polledItemsTracker);
return numPollsRemaining;
}
resetTrackerForPollItem(device, pollWindowMsec, pollItem) {
// FIXME: Type check for JSON string vs Object, handle differently
// FIXME: Where to handle/catch these errors?
if (!this._pollTrackersByDevice.has(device.devicePath)) {
throw new Error('Poll tracker for this device does not exist');
}
const polledItemsTracker = this._pollTrackersByDevice.get(device.devicePath);
const pollIntervalMsec = this.obtainPollIntervalForItem(pollItem, device);
if (_isNil(pollIntervalMsec)) {
debug(`[resetTrackerForPollItem] The specified device ${device.devicePath} does not have a poll interval defined. Skipping poll tracker setup for this device.`);
this.adapter.k4.logging.info(`The specified device ${device.devicePath} does not have a poll interval defined. Skipping poll tracker setup for this device.`);
return;
}
polledItemsTracker.set(
JSON.stringify(pollItem, null, '\t'),
Math.floor(pollIntervalMsec / pollWindowMsec)
);
this._pollTrackersByDevice.set(device.devicePath, polledItemsTracker);
}
/**
* @description Correctly decrements polls remaining count while accounting
* for "hourglass"/count refill
* @param {Number} pollIntervalMsec
* @param {Number} pollWindowMsec
* @param {Number} currentNumPollsRemaining
* @returns {Number} Number of pollWindows remaining after decrementing
*/
decrementNumPollsRemaining(pollIntervalMsec, pollWindowMsec, currentNumPollsRemaining) {
if (Number.isNaN(currentNumPollsRemaining) === true) {
throw new Error(`Poll Decrementor/Hourglass has invalid count: ${currentNumPollsRemaining}`);
}
if (currentNumPollsRemaining === 0) {
return Math.floor(pollIntervalMsec / pollWindowMsec) - 1;
}
return currentNumPollsRemaining - 1;
}
/**
* @description Determines the device's node state, if available
* @param {Device} device
* @returns {String|null}
*/
checkDeviceNodeState(device) {
const nodeStatePath = 'variables/nodeState';
if (!_isNil(device) &&
!_isNil(device.model) &&
!_isNil(device.model.child(nodeStatePath))) {
return device.model.child(nodeStatePath).value();
}
this.adapter.k4.logging.debug('[PollingManager]: Unable to determine nodeState for device', device.devicePath);
return null;
}
/**
* @description Get the appropriate poll interval for an item, factoring in
* K4 Model property overrides
* @param {Object} pollItem Information and settings for polled item
* @param {Device} device The device to poll
* @returns {Number} The appropriate poll interval (msec)
*/
obtainPollIntervalForItem(pollItem, device) {
const pollIntervalSec = this.obtainPollMappingFieldValue('intervalSec', pollItem.intervalSec, device, pollItem);
if (_isNil(pollIntervalSec)) {
this.adapter.k4.logging.debug(`Poll interval does not exist for poll command ${pollItem.command} of device ${device.devicePath}`);
return;
}
if (!_isNil(pollItem.property)) {
const propOverride = pollItem.property
.find((value) => {
if (!_isNil(value) &&
!_isNil(value.argument) &&
value.argument === 'intervalSec') {
return true;
}
return false;
}, this);
if (!_isNil(propOverride) &&
!_isNil(device.model) &&
!_isNil(device.model.child(`properties/${propOverride.name}`))) {
const intervalValOverride = device.model.child(`properties/${propOverride.name}`).value();
// NOTE: The supplied model property override interval should be
// in seconds, so we must convert to milliseconds for the
// polling manager plugin.
return intervalValOverride * 1000;
}
}
return pollItem.intervalSec * 1000;
}
/**
* @description Determines the pollSlot for this controller using a
* computation involving its MAC address
* @param {Number} min The lowest pollSlot that can be assigned to this
* controller. Inclusive
* @param {Number} max The largest pollSlot that can be assigned to this
* controller. Inclusive.
* @param {String} addressMAC The MAC address for this controller
* @returns {Number} The pollSlot: an integer betwen [0, n]
*/
calculatePollSlot(min, max, addressMAC) {
const macAddress = String(addressMAC).replace(/:/g, '');
const hashObj = crypto.createHash('whirlpool');
hashObj.update(macAddress, 'utf8');
const hashedValue = new BigNumber(hashObj.digest('hex'), 16);
const assignedSlotIndex = hashedValue.modulo((max - min) + 1);
return assignedSlotIndex.toNumber() + min;
}
/**
* @description Returns the correct duration for each allotted pollSlot
* within the pollWindow or fails.
* @param {Number} pollWindowMsec Duration for all controllers' poll operations
* (msec)
* @param {Number} numSlots Total number of slots in the pollWindow
* @returns {Number} The allotted duration for a controller pollSlot (msec)
* @throws {Error}
*/
getValidatedPollSlotDuration(pollWindowMsec, numSlots) {
if (numSlots % 1 !== 0 || numSlots <= 0) {
throw new Error(`Number of slots ${numSlots} specified for the pollWindow was not a positive integer`);
}
// If pollWindow divides evenly into a minute (or is a positive integer
// multiple of minutes)
if ((60000 % pollWindowMsec === 0) || (pollWindowMsec % 60000 === 0)) {
const slotDurationMsec = pollWindowMsec / numSlots;
// If the slot duration is a whole number of seconds (msec)
if (slotDurationMsec % 1000 === 0) {
return slotDurationMsec;
}
throw new Error(`Invalid slot duration: ${slotDurationMsec} from dividing poll window ${pollWindowMsec} into ${numSlots} slots`);
}
throw new Error(`Invalid poll window ${pollWindowMsec} division into factors or multiples of 60 sec`);
}
/**
* @description Sets up the main polling timer
* @param {Function} onSlotHandler Handler to be called once per tick
* @param {Number} pollWindowMsec Duration for all controllers' poll operations
* (msec)
* @param {Number} numSlots Total number of slots in the pollWindow
* @returns {Tuple<Object, Number>} Returns reference to an array containing (i) the
* object which stores the tick tracker/metronome and (ii) the time when
* polling started
*/
createTimer(onSlotHandler, pollWindowMsec, numSlots) {
// NOTE: This is a Blocking call so that subsequent timer repetitions
// will be exactly n number of pollWindow durations distant from
// this starting call.
debug(`[createTimer] Waiting for polling timer initiation. pollWindowMsec, numSlots: `, pollWindowMsec, numSlots);
this.adapter.k4.logging.info('Waiting for polling timer initiation');
const pollSlotDurationMsec = this.getValidatedPollSlotDuration(pollWindowMsec, numSlots);
let timezone = momentTimezone.tz.guess();
try {
const controllerNode = this.adapter.k4.controller;
const controllerTimezone = controllerNode.child('variables/clock/timezone').value();
if (controllerTimezone &&
momentTimezone.tz.zone(controllerTimezone)) {
timezone = controllerTimezone;
}
} catch (e) {
debug('[createTimer] Difficulty obtaining K4Model controller timezone. Will attempt to guess via Moment.', e);
}
const timerJob = new CronTask(
`*/${Math.round(pollSlotDurationMsec / 1000)} * * * * *`,
onSlotHandler.bind(this, pollSlotDurationMsec, pollWindowMsec),
null,
false, // Do not autostart
timezone
);
timerJob.start();
return [
timerJob,
Date.now()
];
}
/**
* @description Orchestrates everything that happens within a pollSlot
* @param {Number} slotDurationMsec Duration of a pollSlot (msec)
* @param {Number} pollWindowMsec Duration of the pollWindow (msec)
*/
onPollSlot(slotDurationMsec, pollWindowMsec) {
const initialTimeMsec = Date.now();
const pollSlot = this.checkPollSlot(initialTimeMsec, slotDurationMsec, pollWindowMsec);
debug(`[onPollSlot]: initialTimeMsec, slotDurationMsec, currentPollSlot, myPollSlot:`, initialTimeMsec, slotDurationMsec, pollSlot, this.pollSlot);
if (pollSlot === this.pollSlot && this.havePolledThisWindow === false) {
this.havePolledThisWindow = true;
this.poll(initialTimeMsec)
.then((pollResultTuple) => {
const finishedTime = pollResultTuple[3];
const actualPollDuration = finishedTime - initialTimeMsec;
debug(`[onPollSlot] POLL SUCCESSFUL: initialTimeMsec, slotDurationMsec, currentPollSlot, myPollSlot, finishedTime, actualPollDuration:`, initialTimeMsec, slotDurationMsec, pollSlot, this.pollSlot, finishedTime, actualPollDuration);
})
.catch((e) => {
debug(`[onPollSlot] POLL ERROR: initialTimeMsec, slotDurationMsec, currentPollSlot, myPollSlot,:`, initialTimeMsec, slotDurationMsec, pollSlot, this.pollSlot, e);
this.adapter.k4.logging.error(`Error when polling this controller ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
});
} else if (pollSlot === this.AD_HOC_POLL_SLOT) {
this.havePolledThisWindow = false;
this.processAdHocTasks(initialTimeMsec, slotDurationMsec, this._adHocTasksQueue)
.then((adHocCompletionSummary) => {
debug(`[onPollSlot] AD HOC SUCCESSFUL: initialTimeMsec, slotDurationMsec, currentPollSlot, myPollSlot, adHocDuration, numTasksCompleted:`, initialTimeMsec, slotDurationMsec, pollSlot, this.pollSlot, adHocCompletionSummary.completedTime - initialTimeMsec, adHocCompletionSummary.numTasksCompleted);
this.adapter.k4.logging.info(`Ad hoc slot processed ${adHocCompletionSummary.numTasksCompleted} tasks in ${adHocCompletionSummary.completedTime - initialTimeMsec}`);
})
.catch((e) => {
debug(`[onPollSlot] AD HOC ERROR: initialTimeMsec, slotDurationMsec, currentPollSlot, myPollSlot,:`, initialTimeMsec, slotDurationMsec, pollSlot, this.pollSlot, e);
this.adapter.k4.logging.error('Error occurred within ad hoc slot processing', e);
});
}
}
/**
* @description Determines the designated slotId for this pollSlot
* @param {Number} currentTimeMsec The time current as of the beginning
* of this pollSlot
* @param {Number} slotDurationMsec Duration of a pollSlot (msec)
* @param {Number} pollWindowMsec Duration of the pollWindow (msec)
* @returns {Number} Returns a slotId in the range [0, pollSlots)
* (upper exclusive). In total, there are (pollSlots + 1) possible slots to
* account for the ad-hoc slot.
*/
checkPollSlot(currentTimeMsec, slotDurationMsec, pollWindowMsec) {
const offsetMsec = (currentTimeMsec % pollWindowMsec);
return Math.floor(offsetMsec / slotDurationMsec);
}
/**
* @description Manages the tasks designated for execution in the ad hoc
* slot
* @param {Number} scheduledStartTime The scheduled beginning time of the
* ad hoc slot
* @param {Number} slotDurationMsec Duration of a pollSlot (msec)
* @param {PromiseQueue} taskQueue The running queue of tasks to be executed
* in the ad hoc slot
* @returns {Promise<Object>} Resolves with the ending time of the ad hoc
* processing routine
*/
async processAdHocTasks(scheduledStartTime, slotDurationMsec, taskQueue) {
// Re-activate the queue
const tasksRemainingBefore = taskQueue.pending + taskQueue.size;
taskQueue.start();
debug(`[processAdHocTasks] Polling Manager commenced ad-hoc task execution scheduled at: ${scheduledStartTime}`);
this.adapter.k4.logging.info(`Polling Manager commenced ad-hoc task execution scheduled at: ${scheduledStartTime}`);
// Deactivate the queue once the alloted adhoc slot duration has elapsed
await TimingUtility.setTimeoutPromise(slotDurationMsec);
taskQueue.pause();
const tasksRemainingAfter = taskQueue.pending + taskQueue.size;
return {
numTasksCompleted: tasksRemainingBefore - tasksRemainingAfter,
completedTime: Date.now()
};
}
/**
* @description Enqueues a consumer to register a task for later execution
* during the designated ad hoc pollSlot
* @param {Function} task A synchronous or asynchronous task to run
* @param {Object} context The context
* @param {Array} argsArray Arguments to call the specified task with
* @returns {Promise} Resolves once the registered task has been completed
* @public
*/
registerAdHocTask(task, context, argsArray) {
const taskWithArgsApplied = _spread(task);
taskWithArgsApplied.bind(context || this);
return this._adHocTasksQueue.add(() => Promise.resolve(taskWithArgsApplied(argsArray)));
}
/**
* @description Carries out the actual polling procedure. Still allows poll
* bleed into other controllers' poll slots (by design).
* @param {Number} scheduledPollTime The RTC time for beginning of this
* poll (msec)
* @returns {Promise<Tuple<Number, Boolean, Object[], Number>>} Resolves
* with the poll starting time, Boolean if poll was called or not, a list
* of items that were polled on this run, and the poll ending time
*/
async poll(scheduledPollTime) {
const itemsPolled = [];
try {
const itemsToPoll = [];
// Critical section goes here
for (let [devicePath, currentDevice] of Object.entries(this.adapter.bridge.devices)) {
if (_isNil(currentDevice) ||
_isNil(currentDevice._mapping) ||
_isNil(currentDevice._mapping.polling)) {
continue;
}
this.prunePollTrackerForDevice(currentDevice);
const currentDeviceNodeState = this.checkDeviceNodeState(currentDevice);
if (_isNil(currentDeviceNodeState)) {
debug(`[poll] The specified device to poll does not have a valid node state`, currentDevice.devicePath);
this.adapter.k4.logging.warning('[PollingManager] The specified device to poll does not have a valid node state', currentDevice.devicePath);
continue;
}
let shouldPollAfterDeviceDead = PollingManager.constants.misc.PING_AFTER_DEAD_DEFAULT;
if (!_isNil(currentDevice) &&
!_isNil(currentDevice._mapping) &&
!_isNil(currentDevice._mapping.polling) &&
!_isNil(currentDevice._mapping.polling.pollAfterDead)) {
shouldPollAfterDeviceDead = currentDevice._mapping.polling.pollAfterDead;
}
if (currentDeviceNodeState === PollingManager.constants.nodeStates.DEAD && shouldPollAfterDeviceDead !== true) {
this.emitter.emit('skippingPollForDeadNode', currentDevice.devicePath);
debug(`[poll] pollAfterDead is set to false. Skipping poll for device`, currentDevice.devicePath);
this.adapter.k4.logging.info('pollAfterDead is set to false. Skipping poll for device', currentDevice.devicePath);
continue;
}
currentDevice._mapping.polling.commands
.forEach((pollItem) => {
try {
const pollWindowsRemaining = this.updateTrackerForPollItem(currentDevice, this.pollWindow, pollItem);
if (pollWindowsRemaining !== 0) {
return;
}
itemsToPoll.push({
pollItem: pollItem,
polledDevicePath: devicePath,
polledTime: scheduledPollTime
});
} catch (e) {
debug(`[poll] Difficulty polling the poll item: ${JSON.stringify(pollItem)}. Error was: `, e);
this.adapter.k4.logging.error(`Difficulty polling the poll item: ${JSON.stringify(pollItem)}. Error was: ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
}
}, this);
}
const pollCommands = itemsToPoll
.map((itemToPollObj) => {
return async () => {
debug(`[poll] Issuing poll command: `, itemToPollObj.polledDevicePath, itemToPollObj);
const deviceForCurrentItemToPoll = this.adapter.bridge.devices[itemToPollObj.polledDevicePath];
this.emitter.emit('issuingPollCmd', deviceForCurrentItemToPoll, itemToPollObj);
try {
await this.issuePollCommand(deviceForCurrentItemToPoll, itemToPollObj.pollItem);
itemsPolled.push({
pollItem: itemToPollObj.pollItem,
polledDevicePath: itemToPollObj.devicePath,
polledTime: itemToPollObj.polledTime,
actualPolledTime: Date.now()
});
} catch (e) {
debug('[poll] Encountered error when issuing poll command: ', itemToPollObj, e);
this.adapter.k4.logging.error(`Encountered error when issuing poll command for ${JSON.stringify(itemToPollObj)}. Error was: ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
}
};
});
await PromiseSeries(pollCommands);
} catch (e) {
debug(`[poll] Encountered error in the poll() method scheduled at ${scheduledPollTime}. `, e);
this.adapter.k4.logging.warning(`Encountered error in the poll() method scheduled at ${scheduledPollTime}`, e instanceof Error ? e.stack.split('\n') : null);
}
return [scheduledPollTime, true, itemsPolled, Date.now()];
}
/**
* @description Actually send a single command for a poll item
* @param {Device} device The device to poll
* @param {Object} pollItem The poll item content
* @returns {Promise} Resolves when command is passed to bridge for
* transport
*/
async issuePollCommand(device, pollItem) {
// Use K4Model properties to override config args
const pollItemJSON = JSON.stringify(pollItem, null, 4);
let cmdArgs = {};
const pollItemObj = JSON.parse(pollItemJSON);
if (typeof pollItemObj.arguments !== 'undefined' && pollItemObj.arguments !== null) {
cmdArgs = pollItemObj.arguments;
}
Object.keys(cmdArgs)
.forEach((argName) => {
cmdArgs[argName] = this.obtainPollMappingFieldValue(argName, pollItem.arguments[argName], device, pollItem);
}, this);
debug(`[poll] Issuing poll command ${pollItemObj.command} to device ${device.devicePath} with args: ${JSON.stringify(cmdArgs, null, 4)}`);
this.adapter.k4.logging.debug(`Issuing poll command ${pollItemObj.command} to device ${device.devicePath} with args: ${JSON.stringify(cmdArgs, null, 4)}`);
const cmdToSend = device.getCommand(pollItemObj.command).name;
return this.adapter.sendCommand(cmdToSend, device, cmdArgs);
}
/**
* @description Determine the value to set for a specific command argument,
* giving primacy to K4Model property overrides
* @param {String} fieldName Name of the field
* @param {any} defaultFieldValue The hard-coded fallback value for the field
* @param {Device} device The device to poll
* @param {Object} pollItem The poll item itself
* @returns {String|Boolean|Number|null} The appropriate value to set for
* this argument
*/
obtainPollMappingFieldValue(fieldName, defaultFieldValue, device, pollItem) {
if (device.isTemporaryDevice) {
return defaultFieldValue;
}
const modelOverrideVal = this.obtainPollingManagerPropOverrideForVal(fieldName, device, pollItem);
if (!_isNil(modelOverrideVal)) {
return modelOverrideVal;
}
// Return default field value if there are no model property
// overrides
return defaultFieldValue;
}
/**
* @description Determine, if available, the K4Model property override value
* for a poll item
* @param {String} argName Name of the argument belonging to a node monitoring
* parameter
* @param {Device} device The device whose nodeState is monitored
* @param {Object} pollItem The settings for monitoring
* @returns {any} The appropriate value the argument should be set to
*/
obtainPollingManagerPropOverrideForVal(argName, device, pollItem) {
let propOverrides = null;
if (!_isNil(pollItem) &&
!_isNil(pollItem.property)) {
propOverrides = pollItem.property
.find(prop => prop.argument === argName, this);
}
if (!_isNil(propOverrides)) {
if (!device.model.child(`properties/${propOverrides.name}`)) {
debug(`[obtainPollingManagerPropOverrideForVal] K4Model property node to override with does not exist: ${propOverrides.name}`);
this.adapter.k4.logging.system(`K4Model property node to override with does not exist: ${propOverrides.name}`);
return undefined;
}
const propOverrideValue = device.model.child(`properties/${propOverrides.name}`).value();
return propOverrideValue;
}
}
// NOTE: Most of these could probably be garbage-collected, however
// nulling them here should remove references in case reference to
// pollingManager instance still exists.
clearTimer() {
if (_isNil(this._pollTimer) || _isNil(this._pollTimer.stop)) {
return;
}
this._pollTimer.stop();
this._pollTimer = null;
}
close(options) {
this.closeRoutine.bind(this);
if (!_isNil(options) &&
!_isNil(options.force) &&
options.force === true) {
this._initCloseQueue.pause();
this._initCloseQueue.clear();
this._initCloseQueue.start();
}
return this._initCloseQueue.add(() => this.closeRoutine());
}
async closeRoutine() {
this.clearTimer();
this.emitter.removeAllListeners('issuingPollCmd');
if (!_isNil(this.mappingChangeHandler)) {
this.mappingChangeMonitor.emitter.removeListener(MappingChangeMonitor.constants.MAPPING_CHANGED_EVENT_NAME, this.mappingChangeHandler);
this.mappingChangeHandler = null;
}
if (!_isNil(this.modelChangeHandler)) {
this.modelChangeMonitor.emitter.removeListener(ModelChangeMonitor.constants.MODEL_NODES_CHANGED_EVENT_NAME, this.modelChangeHandler);
this.modelChangeHandler = null;
}
this.mappingChangeMonitor.close();
this.modelChangeMonitor.close();
if (!_isNil(this._optsListener)) {
this.adapter.k4.device.child('properties/pollSlots').off('value', this._optsListener);
this.adapter.k4.device.child('properties/pollWindow').off('value', this._optsListener);
this.adapter.k4.device.child('properties/pollSlot').off('value', this._optsListener);
this._optsListener = null;
}
this._pollTimerCreationTime = null;
this._pollSlots = null;
this._pollWindow = null;
this._pollSlotDuration = null;
if (!_isNil(this._pollTrackersByDevice)) {
this._pollTrackersByDevice
.forEach((v, devicePath) => {
this._pollTrackersByDevice.set(devicePath, null);
}, this);
this._pollTrackersByDevice = null;
}
if (!_isNil(this._adHocTasksQueue)) {
// Clean up ad-hoc task queue
this._adHocTasksQueue.clear();
this._adHocTasksQueue = null;
}
// Reset pollSlot on K4Model to default (b/c can always recalculate)
if (this.adapter.k4.device.child('properties/pollSlot').value() === this._defaultPollSlot) {
this._pollSlot = null;
return;
}
return new Promise((resolve, reject) => {
this.adapter.k4.device.child('properties/pollSlot').set(this._defaultPollSlot, (err) => {
this._pollSlot = null;
if (err) {
reject(new Error(err));
return;
}
resolve();
});
});
}
};
module.exports = PollingManager;