const UUID = require('uuid/v4')
const _ = require('lodash')
const { Buffer } = require('safe-buffer')
const { Server } = require('ws')
const { EventEmitter2 } = require('eventemitter2')
const { createHash } = require('crypto')
const { join } = require('path')
const Message = require(join(__dirname, '..', 'message'))
const glob = require('minimatch')
const heartbeats = require('ws-heartbeats')
/**
* Web socket to enable communication as sub pub
*/
class WebSocketServer extends EventEmitter2 {
/**
* Creates an instance of WebSocketServer.
* @see {@link https://github.com/websockets/ws/blob/master/doc/ws.md#class-websocketserver|ws}
* @constructor
* @param {Object} [options] `WS` server options
* @param {Number} [options.port=9500] server port
* @param {String} [options.host=localhost] server host
* @param {Boolean} [options.clientTracking=true] client tracking
* @param {Boolean} [options.heartbeats=true] use WS ping/pong heartbeats
* @param {logger|Boolean} [options.logger={logger}] Optional logger to pass in, or use a default instance of {logger} set to debug level
* @param {Function} [callback] executed when the server is ready
* @fires WebSocketServer#connection
* @fires WebSocketServer#reconnect
* @fires WebSocketServer#close
* @fires WebSocketServer#error
* @returns {WebSocketServer}
*/
constructor (opts = {}, callback = function () {}) {
super({ wildcard: true })
let _this = this
opts = _.defaultsDeep(opts, {
clientTracking: true,
heartbeats: true,
logger: false
})
this.message = new Message(opts.serializer)
/**
* @prop {Object} connections dictionary of connected clients
*/
this.connections = {}
/**
* @prop {logger} logger the logger of this instance
*/
if (_.isBoolean(opts.logger)) {
opts.logger = new (require(join(__dirname, '..', 'logger')))({
level: opts.logger === true ? 'debug' : 'fatal',
pretty: opts.logger
})
}
this.logger = opts.logger
/**
* @prop {Object} server instance
* @see {@link https://github.com/websockets/ws|WS}
*/
this.server = new Server(opts, callback)
this.server
.on('connection', function (socket, { headers } = {}) {
if (opts.heartbeats) {
// start ping/pong heartbeats
heartbeats(socket)
}
// use the http basic auth as ID
let rawID = _.has(headers, 'authorization') ? new Buffer(
headers.authorization.replace('Basic ', ''),
'base64'
).toString() : UUID()
let ID = makeSHA(rawID)
// has previously connected
if (_this.connections.hasOwnProperty(ID)) {
// TODO: wait timer cancel
let conn = _this.connections[ID]
conn.socket = socket
conn.open = true
/**
* `id` of reconnected client
* @event WebSocketServer#reconnect
* @type {String}
*/
let encoded = _this.message.pack('reconnect', ID)
if (encoded) {
socket.send(encoded)
}
_this.emit('reconnect', ID)
} else {
// set connection key to SHA
_this.connections[ID] = {
id: ID,
open: true,
namespaces: [
'connect', 'reconnect'
],
socket
}
let encoded = _this.message.pack('connect', ID)
if (encoded) {
socket.send(encoded)
}
/**
* `id` of connected client
* @event WebSocketServer#connection
* @type {String}
*/
_this.emit('connection', ID)
}
socket.cleanupClose = function () {
let conn = _this.connections[ID]
conn.open = false
conn.namespaces = ['connect', 'reconnect']
// TODO: start close timer to kill connection
}
/**
* `id` of closed client
* @event WebSocketServer#close
* @type {String}
*/
socket.on('close', function () {
_this.emit('close', ID)
socket.cleanupClose()
})
/**
* `error` object from client disconnection (most likely)
* @event WebSocketServer#error
* @type {Object}
*/
socket.on('error', function (err) {
_this.emit('error', err)
socket.cleanupClose()
})
// decode message, should be an array
// last array item is the payload, everything before is the namespace joined by `.`
// example: ['person', 'name', { first: 'John', last: 'Smith' }]
socket.on('message', function (data) {
let decoded = _this.message.unpack(data)
if (decoded) {
let { ns, pl } = decoded
_this.logger.debug({
name: 'message', id: ID, ns, pl
})
switch (ns) {
case '_sub': {
if (!_.isArray(pl)) {
pl = [pl]
}
_this.connections[ID].namespaces = _.union(_this.connections[ID].namespaces, pl)
_this.logger.debug({
name: '_sub',
namespaces: _this.connections[ID].namespaces
})
break
}
case '_unsub': {
_.remove(_this.connections[ID].namespaces, function (n) {
return n === pl
})
_this.logger.debug({
name: '_unsub',
namespaces: _this.connections[ID].namespaces
})
break
}
default: {
_this.emit(ns, pl, ID)
break
}
}
}
})
})
.on('error', function (err) {
_this.emit('error', err)
})
return this
}
/**
* Publish data to a given namespace
* @param {String} namespace event namespace
* @param {any} data the data to publish
* @memberof WebSocketServer
*/
pub (namespace, data) {
let _this = this
let packed = _this.message.pack(namespace, data)
this.logger.debug({
name: 'pub', namespace, data
})
if (packed) {
_.forEach(_this.connections, function ({ namespaces, socket, open }) {
if (open) {
_.forEach(namespaces, function (n) {
if (glob(namespace, n)) {
_this.logger.debug({
name: 'pub', namespace, data
})
try {
socket.send(packed)
} catch (err) {
_this.emit('error', new Error(err))
}
}
})
}
})
}
return this
}
/**
* @description close the server
* @param {Object} callback callback when server has closed
* @memberof WebSocketServer
*/
close (cb = function () {}) {
if (this.server) {
this.server.close(cb)
}
return this
}
}
function makeSHA (str) {
let shasum = createHash('sha1')
shasum.update(str)
return shasum.digest('hex')
}
module.exports = WebSocketServer