transports/dtls.js

'use strict';

const dgram = require('dgram');
const dtlsServer = require('@k4connect/node-mbed-dtls');
const dtlsClient = require('@k4connect/node-mbed-dtls-client');
const Transport = require('../classes/transport');
const _isNil = require('lodash/isNil');
const _get = require('lodash/get');
const events = require('events');
const SafeBuffer = require('safe-buffer').Buffer;
const async = require('async');

const DEFAULT_MAX_NUM_RESTARTS = 5;
const DEFAULT_ALLOWED_RECEIVER_STARTUP_TIME_MSEC = 2000;
const DEFAULT_SOCKET_FAMILY = 'udp4';
const DEFAULT_HANDSHAKE_TIMEOUT_MIN_MSEC = 1000;
const DEFAULT_SOCKET_SESSION_CLOSE_TIMEOUT_MSEC = 60000;

const DTLS_HANDSHAKE_RETRY_INTERVAL_MS = 100;

const DTLS_TRANSPORT_CLOSE_DELAY_MS = 1000;

/**
 * @class
 * @extends Transport
 */

class Dtls extends Transport {
	/**
	 * @constructor
	 * @description A transport for data that involves sending and receiving
	 * packets via Datagram Transport Layer Security. See RFC 6347:
	 * https://tools.ietf.org/html/rfc6347
	 * @param {Object} opts General settings for the Dtls Transport
	 * @param {String} opts.protocol "udp4" or "udp6"
	 * @param {Buffer} opts.port Port the underlying datagram socket shall be
	 * bound to. If not specified, the operating system is requested to assign
	 * a port at its discretion.
	 * @param {Buffer} opts.psk The preshared key between this Transport and
	 * external clients or servers
	 * @param {Buffer} [opts.maxNumRestartsOnError] Inclusive upper limit on
	 * the number of total "cycles" that the DTLS receiver will go through
	 * after terminating on an error. A cycle is one iteration of a termination
	 * followed by a reinitialization of the DTLS receiver.
	 * @param {Number} [opts.allowedReceiverStartupTimeMsec] The duration (msec)
	 * to wait for the DTLS receiver to complete initialization.
	 * @param {Number} [opts.handshakeTimeoutMinMsec] The duration (msec)
	 * to wait for the DTLS receiver to complete a handshake when clients are
	 * connecting.
	 * @param {Number} [opts.socketSessionCloseTimeoutMsec] Time before marking
	 * a DTLS Session as having expired due to lack of communication.
	 * @param {Adapter} adapter Adapter instance for which this is a transport.
	 */

	constructor(opts, adapter) {
		super(opts, adapter);
		this._emitter = new events.EventEmitter();
		this._socketSessionMap = new Map();

		this._receiverOnErrorCloseCount = 0;
		this._maxNumRestartsOnError = DEFAULT_MAX_NUM_RESTARTS;
		this._allowedReceiverStartupTimeMsec = DEFAULT_ALLOWED_RECEIVER_STARTUP_TIME_MSEC;
		this._socketSessionCloseTimeoutMsec = DEFAULT_SOCKET_SESSION_CLOSE_TIMEOUT_MSEC;

		if (_isNil(this._opts)) {
			this._opts = {};
		}

		if (!_isNil(this._opts.maxNumRestartsOnError)) {
			this._maxNumRestartsOnError = this._opts.maxNumRestartsOnError;
		}

		if (!_isNil(this._opts.allowedReceiverStartupTimeMsec)) {
			this._allowedReceiverStartupTimeMsec = this._opts.allowedReceiverStartupTimeMsec;
		}

		if (!_isNil(this._opts.socketSessionCloseTimeoutMsec)) {
			this._socketSessionCloseTimeoutMsec = this._opts.socketSessionCloseTimeoutMsec;
		}

		this._opts.protocol = this._opts.protocol || DEFAULT_SOCKET_FAMILY;
		this._opts.psk = this._opts.psk || SafeBuffer.from('');

		if (typeof this._opts.port !== 'number') {
			this._opts.port = 0;
		}

		if (typeof this._opts.handshakeTimeoutMinMsec !== 'number') {
			this._opts.handshakeTimeoutMin = DEFAULT_HANDSHAKE_TIMEOUT_MIN_MSEC;
		}

		this.initializeReceiverErrorHandler();
	}

	/**
	 * @description Setup the listener if one is configured
	 * @returns {Promise}
	 */

	init() {
		const socketToUse = dgram.createSocket(this._opts.protocol);
		return new Promise((resolve, reject) => {
			setTimeout(() => {
				reject(new Error('Dtls transport did not start up in time'));
			}, this._allowedReceiverStartupTimeMsec);

			this._dtlsServer = dtlsServer.createServer(
				{
					handshakeTimeoutMin: this._opts.handshakeTimeoutMinMsec,
					socket: socketToUse,
					psk_cb: (pskIdentity) => {
						this._adapter.k4.logging.info(`'[Dtls] Got pskIdentity: ${pskIdentity}`);
						return this._opts.psk;
					}
				}
			)
				.on('listening', () => {
					this._universalDgramSocket = socketToUse;
					resolve();
				})
				.on('secureConnection', (socket) => {
					this.commonNewSocketHandler(socket);
				})
				.on('clientError', (err, client) => this._adapter.k4.logging.info('DTLS Transport receiver (DTLS server) got Client Error: ', Date.now() / 1000, err, client.remoteAddress, client.remotePort))
				.once('error', (serverError) => {
					reject(serverError);
				})
				.on('close', () => this._adapter.k4.logging.info('DTLS Transport receiver (DTLS server) was CLOSED: ', Date.now() / 1000))
				.on('error', dtlsServerError => this._emitter.emit('dtlsReceiverError', dtlsServerError));

			this._dtlsServer.listen(this._opts.port);
		});
	}

	/**
	 * @description Send a command to the specified address
	 * @param {Object} address The address the command will be sent to
	 * @param {Command} command The command to send
	 * @returns {Promise}
	 */

	send(address, command) {
		const body = command.body;
		let headerBuf;

		if (!SafeBuffer.isBuffer(command.header) &&
			!_isNil(command) &&
			!_isNil(command.header) &&
			!_isNil(command.header.packet)) {
			headerBuf = command.header.packet;
		}

		if (_isNil(headerBuf)) {
			headerBuf = SafeBuffer.from([]);
		}

		if (_isNil(body)) {
			return Promise.reject(new Error(`There was no body supplied for command: ${command.name}`));
		}

		return this.write(SafeBuffer.concat([headerBuf, body]), address.port, address.host);
	}

	/**
	 * @description Write data to the DTLS socket session
	 * @param {Buffer} data The data buffer
	 * @param {Number} port The destination port
	 * @param {String} host The destination IP address
	 * @returns {Promise} Resolves after the data in the Buffer has been
	 * written to the DTLS socket session
	 * @private
	 */

	write(data, port, host) {
		this._adapter.k4.logging.debug('Dtls transport send', data.toString('hex'));

		return new Promise((resolve, reject) => {
			// Check whether a socket session exists already for this
			// host:port. If one exists, use that socket session.
			if (this._socketSessionMap.has(`${host}:${port}`)) {
				const socketSessionMapEntry = this._socketSessionMap
					.get(`${host}:${port}`);

				const socketSession = socketSessionMapEntry.socketSession;
				if (_isNil(socketSession)) {
					reject(new Error(`DTLS socket (${host}:${port}) session does not exist.`));
					return;
				}

				if (!_get(socketSession, 'writable', false)) {
					reject(new Error(`DTLS socket (${host}:${port}) is not writable.`));
					return;
				}

				if (!_get(socketSession, 'connected', false)) {
					reject(new Error(`DTLS socket (${host}:${port}) is not connected.`));
					return;
				}

				socketSession
					.write(data, 'buffer', (err) => {
						if (err) {
							reject(err);
							return;
						}

						resolve();
					});
			} else {
				const options = {
					host: host,
					port: port,
					psk: this._opts.psk,
					pskIdentity: SafeBuffer.from('k4-base-bridge'),
					socket: this._universalDgramSocket
				};

				async.retry({
					times: 5,
					interval: DTLS_HANDSHAKE_RETRY_INTERVAL_MS
				},
				cb => {
					var handshakeErrorHandler = null;
					let dtlsClientSocket;
					const initialSocketConnectionTimer = setTimeout(() => {
						if (!_isNil(dtlsClientSocket)) {
							dtlsClientSocket.end();
							dtlsClientSocket.removeAllListeners();
							dtlsClientSocket = null;
						}

						cb(new Error('Waiting for initial socket connection timed out'));
					}, 500);

					dtlsClientSocket = dtlsClient.connect(options, (socketSession) => {
						clearTimeout(initialSocketConnectionTimer);

						this.commonNewSocketHandler.apply(this, [socketSession]);
						if (!_get(socketSession, 'writable', false)) {
							cb(new Error(`DTLS socket (${host}:${port}) is not writable.`));
							return;
						}

						if (!_get(socketSession, 'connected', false)) {
							cb(new Error(`DTLS socket (${host}:${port}) is not connected.`));
							return;
						}

						socketSession.write(data, 'buffer', (err) => {
							if (handshakeErrorHandler != null &&
								!_isNil(dtlsClientSocket)) {
								// Okay to remove this handler now, since by this time commonNewSocketHandler() will have added another error listener.
								dtlsClientSocket.removeListener('error', handshakeErrorHandler);
							}

							if (err) {
								cb(err);
								return;
							}

							cb(null);
						});
					});

					dtlsClientSocket.once('error', handshakeErrorHandler = (err, optionalMessage) => {
						clearTimeout(initialSocketConnectionTimer);

						// Register another handler in case another error occurs before the socket is closed, to prevent ERR_UNHANDLED_ERROR.
						dtlsClientSocket.on('error', error => console.error('[Dtls] Additional Socket Session Error after handshake error:', error));

						if (optionalMessage) {
							cb(optionalMessage);
						} else {
							cb(err);
						}
					});
				},
				(err, result) => {
					if(err) {
						return reject(err);
					} else {
						return resolve(result); // result is currently undefined, but specified here to prevent accidental omission later
					}
				});
			}
		});
	}

	/**
	 * @description Provides the logic to add the new socket to the session
	 * tracker map and set up necessary handlers.
	 * @param {DtlsSocket} socket
	 */
	commonNewSocketHandler(socket) {
		this._adapter.k4.logging.info(`[Dtls]/(commonNewSocketHandler) -- Creating new socketSession map entry for ${socket.remoteAddress}:${socket.remotePort}`);
		if (this._socketSessionMap.has(`${socket.remoteAddress}:${socket.remotePort}`)) {
			this._adapter.k4.logging.info(`[Dtls]/(commonNewSocketHandler) -- socketSession map entry already exists for ${socket.remoteAddress}:${socket.remotePort}, replacing`);
		}

		this._socketSessionMap.set(
			`${socket.remoteAddress}:${socket.remotePort}`,
			{
				socketSession: socket,
				lastSeenTimer: setTimeout(() => {
					if (socket) {
						socket.end();
						socket.removeAllListeners('data');
						socket.removeAllListeners('error');
					}
				}, this._socketSessionCloseTimeoutMsec)
			}
		);

		let socketSessionEntry = this._socketSessionMap.get(`${socket.remoteAddress}:${socket.remotePort}`);

		socketSessionEntry
			.socketSession
			.on('data', (msg) => {
				clearTimeout(socketSessionEntry.lastSeenTimer);
				socketSessionEntry.lastSeenTimer = setTimeout(() => {
					if (socket) {
						socket.end();
						socket.removeAllListeners('data');
						socket.removeAllListeners('error');
					}
				}, this._socketSessionCloseTimeoutMsec);

				this.received(msg, {
					address: socket.remoteAddress,
					port: socket.remotePort
				});
			});

		socketSessionEntry
			.socketSession
			.on('error', error => console.error('[Dtls] Socket Session Error:', error));

		socketSessionEntry
			.socketSession
			.once('close', () => {
				console.info(`[Dtls] socket session was closed: ${socket.remoteAddress}:${socket.remotePort}`);
				// Ensure that we clean up any socket sessions that may have
				// expired or have been manually termianted.
				this._socketSessionMap.delete(`${socket.remoteAddress}:${socket.remotePort}`);
			});
	}

	/**
	 * @description Additional processing on Dtls Receiver/Server errors.
	 * Currently, it closes and reopens the DTLS Server upon each error event
	 * until the max allowable error count threshold is reached.
	 */

	initializeReceiverErrorHandler() {
		this._emitter.on('dtlsReceiverError', (err) => {
			this._adapter.k4.logging.error(`DTLS Receiver/Server error: ${err.toString()}`, err instanceof Error ? err.stack.split('\n') : null);

			if (this._receiverOnErrorCloseCount < this._maxNumRestartsOnError) {
				this._receiverOnErrorCloseCount += 1;
				this.close()
					.then(() => this.init())
					.catch((e) => {
						this._adapter.k4.logging.error(`Error on closing and re-opening DTLS Receiver/Server: ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
						// NOTE: This will tie failures of the error recovery
						// mechanism itself back to the original error handling
						// stream
						this._emitter.emit('dtlsReceiverError', e);
					});
			} else {
				// In this case, close the Dtls Receiver/Server permanently if
				// not already closed
				this.close()
					.then(() => this._adapter.k4.logging.fatal('DTLS transport encountered Receiver/Server error too many times. No longer attempting to re-open'));
			}
		});
	}

	/**
	 * @description Process incoming data
	 * @param {Buffer} data The data buffer
	 * @param {Object} rinfo Info about the receiver
	 * @private
	 */

	received(data, rinfo) {
		this._adapter.k4.logging.debug('DTLS transport received', data.toString('hex'));
		this._adapter.received({
			host: rinfo.address,
			port: rinfo.port
		}, data);
	}

	/**
	 * @description Close the Dtls Server, if one was created.
	 * @returns {Promise}
	 */

	close() {
		return Promise.resolve()
			.then(() => {
				this._socketSessionMap
					.forEach((elem, k) => {
						if (elem) {
							if (elem.socketSession) {
								console.log(`Closing DTLS session: ${elem.socketSession.remoteAddress}:${elem.socketSession.remotePort}`);
								elem.socketSession.end();
							}

							if (elem.lastSeenTimer) {
								clearTimeout(elem.lastSeenTimer);
							}
						}
					});

				this._socketSessionMap.clear();
			})
			.then(() => new Promise((resolve) => {
				setTimeout(resolve, DTLS_TRANSPORT_CLOSE_DELAY_MS);
			}))
			.then(() => new Promise((resolve) => {
				if (this._dtlsServer && this._dtlsServer.listening) {
					this._dtlsServer.close(() => resolve());
				} else {
					resolve();
				}
			}));
	}
};

module.exports = Dtls;