'use strict';
const dgram = require('dgram');
const Transport = require('../classes/transport');
const _isNil = require('lodash/isNil');
const events = require('events');
const SafeBuffer = require('safe-buffer').Buffer;
const DEFAULT_MAX_NUM_RESTARTS = 5;
const DEFAULT_ALLOWED_RECEIVER_STARTUP_TIME = 2000;
/**
* @class
* @extends Transport
*/
class Udp extends Transport {
/**
* @constructor
* @description A transport for data that involves sending and receiving
* packets via UDP/Datagram
* @param {Object} opts General settings for the Udp Transport
* @param {String} opts.protocol "udp4" or "udp6"
* @param {Object} [opts.receiver] Setup an optional receiver
* @param {String} opts.receiver.port Port the receiver is listening on
* @param {String} opts.receiver.protocol "udp4" or "udp6"
* @param {Adapter} adapter Adapter instance for which this is a transport.
*/
constructor(opts, adapter) {
super(opts, adapter);
this._emitter = new events.EventEmitter();
this._receiverOnErrorCloseCount = 0;
this._maxNumRestartsOnError = DEFAULT_MAX_NUM_RESTARTS;
this._allowedReceiverStartupTimeMsec = DEFAULT_ALLOWED_RECEIVER_STARTUP_TIME;
if (!_isNil(this._opts)) {
if (!_isNil(this._opts.maxNumRestartsOnError)) {
this._maxNumRestartsOnError = this._opts.maxNumRestartsOnError;
}
if (!_isNil(this._opts.allowedReceiverStartupTime)) {
this._allowedReceiverStartupTimeMsec = this._opts.allowedReceiverStartupTime;
}
}
this.createSocketErrorHandler();
}
/**
* @description Setup the udp listener if one is configured
* @returns {Promise}
*/
init() {
const receiver = this._opts.receiver;
if (receiver) {
return this.receiver(receiver.port, receiver.protocol);
}
return Promise.resolve();
}
/**
* @description Send a command to the udp address
* @param {Object} address The address the command will be sent to
* @param {Command} command The command to send
* @returns {Promise}
*/
send(address, command) {
const body = command.body;
let headerBuf;
if (!SafeBuffer.isBuffer(command.header) &&
!_isNil(command) &&
!_isNil(command.header) &&
!_isNil(command.header.packet)) {
headerBuf = command.header.packet;
}
if (_isNil(headerBuf)) {
headerBuf = SafeBuffer.from([]);
}
if (_isNil(body)) {
return Promise.reject(new Error(`There was no body supplied for command: ${command.name}`));
}
return this.write(SafeBuffer.concat([headerBuf, body]), address.port, address.host);
}
/**
* @description Write data to the UDP socket
* @param {Buffer} data The data buffer
* @param {Number} port The destination port
* @param {String} host The destination IP address
* @returns {Promise} Resolves after the data in the Buffer has been
* written to the UDP socket
* @private
*/
write(data, port, host) {
const client = this._server ? this._server : dgram.createSocket(this._opts.protocol);
this._adapter.k4.logging.debug('UDP transport send', data.toString('hex'));
return new Promise((resolve, reject) => {
client.send(data, 0, data.length, port, host, (err, bytes) => {
if (!this._server) {
client.close((e) => {
if (e) {
this._adapter.k4.logging.warning(`UDP transport close on send error. ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
reject(e);
} else if (err) {
this._adapter.k4.logging.warning(`UDP transport send error. ${err.toString()}`, err instanceof Error ? err.stack.split('\n') : null);
reject(err);
} else {
resolve(bytes);
}
});
} else if (err) {
this._adapter.k4.logging.warning(`UDP transport send error. ${err.toString()}`, err instanceof Error ? err.stack.split('\n') : null);
reject(err);
} else {
resolve(bytes);
}
});
});
}
/**
* @description Configure the receiver
* @param {Number} port The udp port to listen on
* @param {String} protocol "udp4" or "udp6"
* @returns {Promise}
* @private
*/
receiver(port, protocol) {
this._server = dgram.createSocket(protocol);
this._server.on('message', (data, rinfo) => this.received(data, rinfo));
this._server.once('error', socketErr => this._emitter.emit('socketError', socketErr));
const onSocketReady = new Promise((resolve, reject) => {
this._server.on('listening', () => resolve());
setTimeout(() => reject(new Error('UDP transport did not start up in time')), this._allowedReceiverStartupTimeMsec);
});
this._server.bind(port);
return onSocketReady;
}
/**
* @description Additional processing on UDP
* receiver socket errors. Currently, it closes and reopens the socket upon
* each error event until the max allowable error count threshold is reached
* @param {Object} socket The UDP socket whose errors must be managed
*/
createSocketErrorHandler() {
this._emitter.on('socketError', (err) => {
this._adapter.k4.logging.error(`UDP transport socket error: ${err.toString()}`, err instanceof Error ? err.stack.split('\n') : null);
if (this._receiverOnErrorCloseCount < this._maxNumRestartsOnError) {
this._receiverOnErrorCloseCount += 1;
this.close()
.then(() => this.init())
.catch((e) => {
this._adapter.k4.logging.error(`Error on closing and re-opening UDP transport socket: ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
// NOTE: This will tie failures of the error recovery
// mechanism itself back to the original error handling
// stream
this._emitter.emit('socketError', e);
});
} else {
// In this case, close the socket permanently if not already closed
this.close()
.then((socket) => {
// NOTE: On successful close, must still listen and
// account for rogue errors on this socket. Although,
// this should never happen
if (!_isNil(socket)) {
socket.once('error', e => this._emitter.emit('socketError', e));
}
this._adapter.k4.logging.fatal('UDP transport encountered socket error too many times. No longer attempting to re-open');
})
.catch(e => this._adapter.k4.logging.fatal(`Error on closing UDP transport socket: ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null));
}
});
}
/**
* @description Process incoming data
* @param {Buffer} data The data buffer
* @param {Object} rinfo Info about the receiver
* @private
*/
received(data, rinfo) {
this._adapter.k4.logging.debug('UDP transport received', data.toString('hex'));
this._adapter.received({
host: rinfo.address,
port: rinfo.port
}, data);
}
/**
* @description Close the listener
* @returns {Promise}
*/
close() {
return new Promise((resolve, reject) => {
if (this._server) {
if (this._server._receiving === true) {
this._server.close((err) => {
if (err) {
reject(err);
} else {
resolve(this._server);
}
});
} else {
resolve(this._server);
}
} else {
resolve();
}
});
}
};
module.exports = Udp;