'use strict';
const dgram = require('dgram');
const dtlsServer = require('@k4connect/node-mbed-dtls');
const dtlsClient = require('@k4connect/node-mbed-dtls-client');
const Transport = require('../classes/transport');
const _isNil = require('lodash/isNil');
const _get = require('lodash/get');
const events = require('events');
const SafeBuffer = require('safe-buffer').Buffer;
const async = require('async');
const DEFAULT_MAX_NUM_RESTARTS = 5;
const DEFAULT_ALLOWED_RECEIVER_STARTUP_TIME_MSEC = 2000;
const DEFAULT_SOCKET_FAMILY = 'udp4';
const DEFAULT_HANDSHAKE_TIMEOUT_MIN_MSEC = 1000;
const DEFAULT_SOCKET_SESSION_CLOSE_TIMEOUT_MSEC = 60000;
const DTLS_HANDSHAKE_RETRY_INTERVAL_MS = 100;
const DTLS_TRANSPORT_CLOSE_DELAY_MS = 1000;
/**
* @class
* @extends Transport
*/
class Dtls extends Transport {
/**
* @constructor
* @description A transport for data that involves sending and receiving
* packets via Datagram Transport Layer Security. See RFC 6347:
* https://tools.ietf.org/html/rfc6347
* @param {Object} opts General settings for the Dtls Transport
* @param {String} opts.protocol "udp4" or "udp6"
* @param {Buffer} opts.port Port the underlying datagram socket shall be
* bound to. If not specified, the operating system is requested to assign
* a port at its discretion.
* @param {Buffer} opts.psk The preshared key between this Transport and
* external clients or servers
* @param {Buffer} [opts.maxNumRestartsOnError] Inclusive upper limit on
* the number of total "cycles" that the DTLS receiver will go through
* after terminating on an error. A cycle is one iteration of a termination
* followed by a reinitialization of the DTLS receiver.
* @param {Number} [opts.allowedReceiverStartupTimeMsec] The duration (msec)
* to wait for the DTLS receiver to complete initialization.
* @param {Number} [opts.handshakeTimeoutMinMsec] The duration (msec)
* to wait for the DTLS receiver to complete a handshake when clients are
* connecting.
* @param {Number} [opts.socketSessionCloseTimeoutMsec] Time before marking
* a DTLS Session as having expired due to lack of communication.
* @param {Adapter} adapter Adapter instance for which this is a transport.
*/
constructor(opts, adapter) {
super(opts, adapter);
this._emitter = new events.EventEmitter();
this._socketSessionMap = new Map();
this._receiverOnErrorCloseCount = 0;
this._maxNumRestartsOnError = DEFAULT_MAX_NUM_RESTARTS;
this._allowedReceiverStartupTimeMsec = DEFAULT_ALLOWED_RECEIVER_STARTUP_TIME_MSEC;
this._socketSessionCloseTimeoutMsec = DEFAULT_SOCKET_SESSION_CLOSE_TIMEOUT_MSEC;
if (_isNil(this._opts)) {
this._opts = {};
}
if (!_isNil(this._opts.maxNumRestartsOnError)) {
this._maxNumRestartsOnError = this._opts.maxNumRestartsOnError;
}
if (!_isNil(this._opts.allowedReceiverStartupTimeMsec)) {
this._allowedReceiverStartupTimeMsec = this._opts.allowedReceiverStartupTimeMsec;
}
if (!_isNil(this._opts.socketSessionCloseTimeoutMsec)) {
this._socketSessionCloseTimeoutMsec = this._opts.socketSessionCloseTimeoutMsec;
}
this._opts.protocol = this._opts.protocol || DEFAULT_SOCKET_FAMILY;
this._opts.psk = this._opts.psk || SafeBuffer.from('');
if (typeof this._opts.port !== 'number') {
this._opts.port = 0;
}
if (typeof this._opts.handshakeTimeoutMinMsec !== 'number') {
this._opts.handshakeTimeoutMin = DEFAULT_HANDSHAKE_TIMEOUT_MIN_MSEC;
}
this.initializeReceiverErrorHandler();
}
/**
* @description Setup the listener if one is configured
* @returns {Promise}
*/
init() {
const socketToUse = dgram.createSocket(this._opts.protocol);
return new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error('Dtls transport did not start up in time'));
}, this._allowedReceiverStartupTimeMsec);
this._dtlsServer = dtlsServer.createServer(
{
handshakeTimeoutMin: this._opts.handshakeTimeoutMinMsec,
socket: socketToUse,
psk_cb: (pskIdentity) => {
this._adapter.k4.logging.info(`'[Dtls] Got pskIdentity: ${pskIdentity}`);
return this._opts.psk;
}
}
)
.on('listening', () => {
this._universalDgramSocket = socketToUse;
resolve();
})
.on('secureConnection', (socket) => {
this.commonNewSocketHandler(socket);
})
.on('clientError', (err, client) => this._adapter.k4.logging.info('DTLS Transport receiver (DTLS server) got Client Error: ', Date.now() / 1000, err, client.remoteAddress, client.remotePort))
.once('error', (serverError) => {
reject(serverError);
})
.on('close', () => this._adapter.k4.logging.info('DTLS Transport receiver (DTLS server) was CLOSED: ', Date.now() / 1000))
.on('error', dtlsServerError => this._emitter.emit('dtlsReceiverError', dtlsServerError));
this._dtlsServer.listen(this._opts.port);
});
}
/**
* @description Send a command to the specified address
* @param {Object} address The address the command will be sent to
* @param {Command} command The command to send
* @returns {Promise}
*/
send(address, command) {
const body = command.body;
let headerBuf;
if (!SafeBuffer.isBuffer(command.header) &&
!_isNil(command) &&
!_isNil(command.header) &&
!_isNil(command.header.packet)) {
headerBuf = command.header.packet;
}
if (_isNil(headerBuf)) {
headerBuf = SafeBuffer.from([]);
}
if (_isNil(body)) {
return Promise.reject(new Error(`There was no body supplied for command: ${command.name}`));
}
return this.write(SafeBuffer.concat([headerBuf, body]), address.port, address.host);
}
/**
* @description Write data to the DTLS socket session
* @param {Buffer} data The data buffer
* @param {Number} port The destination port
* @param {String} host The destination IP address
* @returns {Promise} Resolves after the data in the Buffer has been
* written to the DTLS socket session
* @private
*/
write(data, port, host) {
this._adapter.k4.logging.debug('Dtls transport send', data.toString('hex'));
return new Promise((resolve, reject) => {
// Check whether a socket session exists already for this
// host:port. If one exists, use that socket session.
if (this._socketSessionMap.has(`${host}:${port}`)) {
const socketSessionMapEntry = this._socketSessionMap
.get(`${host}:${port}`);
const socketSession = socketSessionMapEntry.socketSession;
if (_isNil(socketSession)) {
reject(new Error(`DTLS socket (${host}:${port}) session does not exist.`));
return;
}
if (!_get(socketSession, 'writable', false)) {
reject(new Error(`DTLS socket (${host}:${port}) is not writable.`));
return;
}
if (!_get(socketSession, 'connected', false)) {
reject(new Error(`DTLS socket (${host}:${port}) is not connected.`));
return;
}
socketSession
.write(data, 'buffer', (err) => {
if (err) {
reject(err);
return;
}
resolve();
});
} else {
const options = {
host: host,
port: port,
psk: this._opts.psk,
pskIdentity: SafeBuffer.from('k4-base-bridge'),
socket: this._universalDgramSocket
};
async.retry({
times: 5,
interval: DTLS_HANDSHAKE_RETRY_INTERVAL_MS
},
cb => {
var handshakeErrorHandler = null;
let dtlsClientSocket;
const initialSocketConnectionTimer = setTimeout(() => {
if (!_isNil(dtlsClientSocket)) {
dtlsClientSocket.end();
dtlsClientSocket.removeAllListeners();
dtlsClientSocket = null;
}
cb(new Error('Waiting for initial socket connection timed out'));
}, 500);
dtlsClientSocket = dtlsClient.connect(options, (socketSession) => {
clearTimeout(initialSocketConnectionTimer);
this.commonNewSocketHandler.apply(this, [socketSession]);
if (!_get(socketSession, 'writable', false)) {
cb(new Error(`DTLS socket (${host}:${port}) is not writable.`));
return;
}
if (!_get(socketSession, 'connected', false)) {
cb(new Error(`DTLS socket (${host}:${port}) is not connected.`));
return;
}
socketSession.write(data, 'buffer', (err) => {
if (handshakeErrorHandler != null &&
!_isNil(dtlsClientSocket)) {
// Okay to remove this handler now, since by this time commonNewSocketHandler() will have added another error listener.
dtlsClientSocket.removeListener('error', handshakeErrorHandler);
}
if (err) {
cb(err);
return;
}
cb(null);
});
});
dtlsClientSocket.once('error', handshakeErrorHandler = (err, optionalMessage) => {
clearTimeout(initialSocketConnectionTimer);
// Register another handler in case another error occurs before the socket is closed, to prevent ERR_UNHANDLED_ERROR.
dtlsClientSocket.on('error', error => console.error('[Dtls] Additional Socket Session Error after handshake error:', error));
if (optionalMessage) {
cb(optionalMessage);
} else {
cb(err);
}
});
},
(err, result) => {
if(err) {
return reject(err);
} else {
return resolve(result); // result is currently undefined, but specified here to prevent accidental omission later
}
});
}
});
}
/**
* @description Provides the logic to add the new socket to the session
* tracker map and set up necessary handlers.
* @param {DtlsSocket} socket
*/
commonNewSocketHandler(socket) {
this._adapter.k4.logging.info(`[Dtls]/(commonNewSocketHandler) -- Creating new socketSession map entry for ${socket.remoteAddress}:${socket.remotePort}`);
if (this._socketSessionMap.has(`${socket.remoteAddress}:${socket.remotePort}`)) {
this._adapter.k4.logging.info(`[Dtls]/(commonNewSocketHandler) -- socketSession map entry already exists for ${socket.remoteAddress}:${socket.remotePort}, replacing`);
}
this._socketSessionMap.set(
`${socket.remoteAddress}:${socket.remotePort}`,
{
socketSession: socket,
lastSeenTimer: setTimeout(() => {
if (socket) {
socket.end();
socket.removeAllListeners('data');
socket.removeAllListeners('error');
}
}, this._socketSessionCloseTimeoutMsec)
}
);
let socketSessionEntry = this._socketSessionMap.get(`${socket.remoteAddress}:${socket.remotePort}`);
socketSessionEntry
.socketSession
.on('data', (msg) => {
clearTimeout(socketSessionEntry.lastSeenTimer);
socketSessionEntry.lastSeenTimer = setTimeout(() => {
if (socket) {
socket.end();
socket.removeAllListeners('data');
socket.removeAllListeners('error');
}
}, this._socketSessionCloseTimeoutMsec);
this.received(msg, {
address: socket.remoteAddress,
port: socket.remotePort
});
});
socketSessionEntry
.socketSession
.on('error', error => console.error('[Dtls] Socket Session Error:', error));
socketSessionEntry
.socketSession
.once('close', () => {
console.info(`[Dtls] socket session was closed: ${socket.remoteAddress}:${socket.remotePort}`);
// Ensure that we clean up any socket sessions that may have
// expired or have been manually termianted.
this._socketSessionMap.delete(`${socket.remoteAddress}:${socket.remotePort}`);
});
}
/**
* @description Additional processing on Dtls Receiver/Server errors.
* Currently, it closes and reopens the DTLS Server upon each error event
* until the max allowable error count threshold is reached.
*/
initializeReceiverErrorHandler() {
this._emitter.on('dtlsReceiverError', (err) => {
this._adapter.k4.logging.error(`DTLS Receiver/Server error: ${err.toString()}`, err instanceof Error ? err.stack.split('\n') : null);
if (this._receiverOnErrorCloseCount < this._maxNumRestartsOnError) {
this._receiverOnErrorCloseCount += 1;
this.close()
.then(() => this.init())
.catch((e) => {
this._adapter.k4.logging.error(`Error on closing and re-opening DTLS Receiver/Server: ${e.toString()}`, e instanceof Error ? e.stack.split('\n') : null);
// NOTE: This will tie failures of the error recovery
// mechanism itself back to the original error handling
// stream
this._emitter.emit('dtlsReceiverError', e);
});
} else {
// In this case, close the Dtls Receiver/Server permanently if
// not already closed
this.close()
.then(() => this._adapter.k4.logging.fatal('DTLS transport encountered Receiver/Server error too many times. No longer attempting to re-open'));
}
});
}
/**
* @description Process incoming data
* @param {Buffer} data The data buffer
* @param {Object} rinfo Info about the receiver
* @private
*/
received(data, rinfo) {
this._adapter.k4.logging.debug('DTLS transport received', data.toString('hex'));
this._adapter.received({
host: rinfo.address,
port: rinfo.port
}, data);
}
/**
* @description Close the Dtls Server, if one was created.
* @returns {Promise}
*/
close() {
return Promise.resolve()
.then(() => {
this._socketSessionMap
.forEach((elem, k) => {
if (elem) {
if (elem.socketSession) {
console.log(`Closing DTLS session: ${elem.socketSession.remoteAddress}:${elem.socketSession.remotePort}`);
elem.socketSession.end();
}
if (elem.lastSeenTimer) {
clearTimeout(elem.lastSeenTimer);
}
}
});
this._socketSessionMap.clear();
})
.then(() => new Promise((resolve) => {
setTimeout(resolve, DTLS_TRANSPORT_CLOSE_DELAY_MS);
}))
.then(() => new Promise((resolve) => {
if (this._dtlsServer && this._dtlsServer.listening) {
this._dtlsServer.close(() => resolve());
} else {
resolve();
}
}));
}
};
module.exports = Dtls;