transports/udp.js

'use strict';

const dgram = require('dgram');
const Transport = require('../classes/transport');
const _isNil = require('lodash/isNil');
const events = require('events');
const SafeBuffer = require('safe-buffer').Buffer;

const DEFAULT_MAX_NUM_RESTARTS = 5;
const DEFAULT_ALLOWED_RECEIVER_STARTUP_TIME = 2000;

/**
 * @class
 * @extends Transport
 */

class Udp extends Transport {
	/**
	 * @constructor
	 * @description A transport for data that involves sending and receiving
	 * packets via UDP/Datagram
	 * @param {Object} opts General settings for the Udp Transport
	 * @param {String} opts.protocol "udp4" or "udp6"
	 * @param {Object} [opts.receiver] Setup an optional receiver
	 * @param {String} opts.receiver.port Port the receiver is listening on
	 * @param {String} opts.receiver.protocol "udp4" or "udp6"
	 * @param {Adapter} adapter Adapter instance for which this is a transport.
	 */
	constructor(opts, adapter) {
		super(opts, adapter);
		this._emitter = new events.EventEmitter();

		this._receiverOnErrorCloseCount = 0;
		this._maxNumRestartsOnError = DEFAULT_MAX_NUM_RESTARTS;
		this._allowedReceiverStartupTimeMsec = DEFAULT_ALLOWED_RECEIVER_STARTUP_TIME;

		if (!_isNil(this._opts)) {
			if (!_isNil(this._opts.maxNumRestartsOnError)) {
				this._maxNumRestartsOnError = this._opts.maxNumRestartsOnError;
			}

			if (!_isNil(this._opts.allowedReceiverStartupTime)) {
				this._allowedReceiverStartupTimeMsec = this._opts.allowedReceiverStartupTime;
			}
		}

		this.createSocketErrorHandler();
	}

	/**
	 * @description Setup the udp listener if one is configured
	 * @returns {Promise}
	 */

	init() {
		const receiver = this._opts.receiver;
		if (receiver) {
			return this.receiver(receiver.port, receiver.protocol);
		}

		return Promise.resolve();
	}

	/**
	 * @description Send a command to the udp address
	 * @param {Object} address The address the command will be sent to
	 * @param {Command} command The command to send
	 * @returns {Promise}
	 */

	send(address, command) {
		const body = command.body;
		let headerBuf;


		if (!SafeBuffer.isBuffer(command.header) &&
			!_isNil(command) &&
			!_isNil(command.header) &&
			!_isNil(command.header.packet)) {
			headerBuf = command.header.packet;
		}

		if (_isNil(headerBuf)) {
			headerBuf = SafeBuffer.from([]);
		}

		if (_isNil(body)) {
			return Promise.reject(new Error(`There was no body supplied for command: ${command.name}`));
		}

		return this.write(SafeBuffer.concat([headerBuf, body]), address.port, address.host);
	}

	/**
	 * @description Write data to the UDP socket
	 * @param {Buffer} data The data buffer
	 * @param {Number} port The destination port
	 * @param {String} host The destination IP address
	 * @returns {Promise} Resolves after the data in the Buffer has been
	 * written to the UDP socket
	 * @private
	 */

	write(data, port, host) {
		const client = this._server ? this._server : dgram.createSocket(this._opts.protocol);

		this._adapter.k4.logging.debug('UDP transport send', data.toString('hex'));
		return new Promise((resolve, reject) => {
			client.send(data, 0, data.length, port, host, (err, bytes) => {
				if (!this._server) {
					client.close((e) => {
						if (e) {
							this._adapter.k4.logging.warning(`UDP transport close on send error. ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
							reject(e);
						} else if (err) {
							this._adapter.k4.logging.warning(`UDP transport send error. ${err.toString()}`, err instanceof Error ? err.stack.split('\n') : null);
							reject(err);
						} else {
							resolve(bytes);
						}
					});
				} else if (err) {
					this._adapter.k4.logging.warning(`UDP transport send error. ${err.toString()}`, err instanceof Error ? err.stack.split('\n') : null);
					reject(err);
				} else {
					resolve(bytes);
				}
			});
		});
	}

	/**
	 * @description Configure the receiver
	 * @param {Number} port The udp port to listen on
	 * @param {String} protocol "udp4" or "udp6"
	 * @returns {Promise}
	 * @private
	 */

	receiver(port, protocol) {
		this._server = dgram.createSocket(protocol);

		this._server.on('message', (data, rinfo) => this.received(data, rinfo));
		this._server.once('error', socketErr => this._emitter.emit('socketError', socketErr));

		const onSocketReady = new Promise((resolve, reject) => {
			this._server.on('listening', () => resolve());
			setTimeout(() => reject(new Error('UDP transport did not start up in time')), this._allowedReceiverStartupTimeMsec);
		});

		this._server.bind(port);
		return onSocketReady;
	}

	/**
	 * @description Additional processing on UDP
	 * receiver socket errors. Currently, it closes and reopens the socket upon
	 * each error event until the max allowable error count threshold is reached
	 * @param {Object} socket The UDP socket whose errors must be managed
	 */

	createSocketErrorHandler() {
		this._emitter.on('socketError', (err) => {
			this._adapter.k4.logging.error(`UDP transport socket error: ${err.toString()}`, err instanceof Error ? err.stack.split('\n') : null);

			if (this._receiverOnErrorCloseCount < this._maxNumRestartsOnError) {
				this._receiverOnErrorCloseCount += 1;
				this.close()
					.then(() => this.init())
					.catch((e) => {
						this._adapter.k4.logging.error(`Error on closing and re-opening UDP transport socket: ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
						// NOTE: This will tie failures of the error recovery
						// mechanism itself back to the original error handling
						// stream
						this._emitter.emit('socketError', e);
					});
			} else {
				// In this case, close the socket permanently if not already closed
				this.close()
					.then((socket) => {
						// NOTE: On successful close, must still listen and
						// account for rogue errors on this socket. Although,
						// this should never happen
						if (!_isNil(socket)) {
							socket.once('error', e => this._emitter.emit('socketError', e));
						}

						this._adapter.k4.logging.fatal('UDP transport encountered socket error too many times. No longer attempting to re-open');
					})
					.catch(e => this._adapter.k4.logging.fatal(`Error on closing UDP transport socket: ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null));
			}
		});
	}

	/**
	 * @description Process incoming data
	 * @param {Buffer} data The data buffer
	 * @param {Object} rinfo Info about the receiver
	 * @private
	 */

	received(data, rinfo) {
		this._adapter.k4.logging.debug('UDP transport received', data.toString('hex'));
		this._adapter.received({
			host: rinfo.address,
			port: rinfo.port
		}, data);
	}

	/**
	 * @description Close the listener
	 * @returns {Promise}
	 */

	close() {
		return new Promise((resolve, reject) => {
			if (this._server) {
				if (this._server._receiving === true) {
					this._server.close((err) => {
						if (err) {
							reject(err);
						} else {
							resolve(this._server);
						}
					});
				} else {
					resolve(this._server);
				}
			} else {
				resolve();
			}
		});
	}
};

module.exports = Udp;