const UUID = require('uuid/v4')
const Client = require('ws')
const url = require('url')
const _ = require('lodash')
const { EventEmitter2 } = require('eventemitter2')
const { join } = require('path')
const Message = require(join(__dirname, '..', 'message'))
/**
* Web socket to enable communication as sub pub
*/
class WebSocketClient extends EventEmitter2 {
/**
* @constructor
* @see {@link https://github.com/websockets/ws/blob/master/doc/ws.md#class-websocket|ws}
* @param {Object} [options] options object
* @param {String} [options.uri=ws://localhost:9500] Full URI of server to connect to
* @param {String} [options.id=UUID] Unique ID of client, defaults to random UUID
* @param {Serializer} [options.serializer] singleton with an encode and decode object to be used
* @param {Function} [options.serializer.encode=msgpack5/encode] encoder function
* @param {Function} [options.serializer.decode=msgpack5/decode] decoder function
* @param {logger|Boolean} [options.logger={logger}] Optional logger to pass in, or use a default instance of {logger} set to debug level
* @param {Object} [options.options] See {@link https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketaddress-protocols-options|WS options}
* @fires WebSocketClient#ready
* @fires WebSocketClient#close
* @fires WebSocketClient#connect
* @returns {WebSocketClient}
*/
constructor ({ uri = 'ws://localhost:9500', id, serializer, options, logger = false } = {}) {
super({
wildcard: true,
newListener: true
})
let _this = this
/**
* @prop {String} id Current `id` of client. Defaults to random `UUID()`
*/
this.id = id || UUID()
/**
* @prop {logger} logger the logger of this instance
*/
if (_.isBoolean(logger)) {
logger = new (require(join(__dirname, '..', 'logger')))({
level: logger === true ? 'debug' : 'fatal',
pretty: logger
})
}
this.logger = logger
/**
* @prop {String} blacklistNamespaces array of namespaces that will be filtered out in pub()
*/
this.blacklist = [
'_pub',
'_sub',
'_unsub',
'close',
'connect',
'error',
'message',
'open',
'ping',
'pong',
'reconnect',
'ready',
'unexpected-response'
]
/**
* @prop {String} uri `uri` which `client` uses to connect
*/
this.uri = url.parse(uri)
this.uri.auth = this.id
this.uri = this.uri.format()
/**
* @prop {Array} subscribed Curret subscribed events. Defaults to `['connect']`
*/
this.subscribed = []
/**
* @prop {Message} message
*/
this.message = new Message(serializer)
/**
* @prop {Object} WS client
* @see {@link https://github.com/websockets/ws|WS}
*/
this.client = new Client(this.uri, this.options)
this.client
.on('open', () => {
if (_this.client.readyState === 1) {
/**
* client has opened and is ready
* @event WebSocketClient#ready
*/
_this.emit('ready')
_this._send('_sub', _this.subscribed)
}
})
/**
* client has closed connection
* @event WebSocketClient#close
*/
.on('close', () => _this.emit('close'))
/**
* client has received a connect event
* @event WebSocketClient#connect
* @type {String}
*/
.on('message', function (msg) {
let unpacked = _this.message.unpack(msg)
if (unpacked) {
let { ns, pl } = unpacked
_this.logger.debug({
name: 'message', pl, ns
})
_this.emit(ns, pl)
}
})
this.on('removeListener', function (ns, fn) {
if (_this._isSubscribed(ns)) {
_.remove(_this.subscribed, function (n) {
return n === ns
})
_this._send('_unsub', ns)
}
})
this.on('newListener', function (ns, fn) {
if (!_this._isSubscribed(ns) && !_this._isBlacklisted(ns)) {
_this.subscribed = _.union(_this.subscribed, [ns])
_this._send('_sub', ns)
}
})
return _this
}
/**
* @memberof WebSocketClient
* @prop {Boolean} connecting client is connecting
*/
get connecting () {
return this.client.readyState === 0
}
/**
* @memberof WebSocketClient
* @prop {Boolean} connecting client is open
*/
get open () {
return this.client.readyState === 1
}
/**
* @memberof WebSocketClient
* @prop {Boolean} connecting client is closeing
*/
get closing () {
return this.client.readyState === 2
}
/**
* @memberof WebSocketClient
* @prop {Boolean} connecting client is closed
*/
get closed () {
return this.client.readyState === 3
}
/**
* @description pub Publish a payload to a given event namespace
* @param {String} namespace - delimited namespace to publish on
* @param {Number|String|Object} payload='' - the payload object to publish
* @param {Function} [callback] - optional callback for message sent
* @returns {WebSocketClient}
* @memberof WebSocketClient
*/
pub (namespace, payload = '', callback = function () {}) {
if (this._isBlacklisted(namespace)) {
callback(new Error(`Cannot publish on blacklisted namespace: ${namespace}`))
return this
}
this._send(namespace, payload, callback)
return this
}
/**
* @description close the client connection
* @returns {WebSocketClient}
* @memberof WebSocketClient
*/
close () {
if (this.client) {
this.client.close(1000, '', {
keepClosed: true
})
}
return this
}
/**
* @private
*/
_send (namespace, payload, callback = function () {}) {
let packed = this.message.pack(namespace, payload)
if (packed) {
try {
this.client.send(packed, callback)
} catch (err) {
this.client.emit('error', err)
callback(err)
}
} else {
callback(new Error('Packing payload did not succeed'))
}
return this
}
/**
* @private
*/
_isSubscribed (ns) {
return _.includes(this.subscribed, ns)
}
/**
* @private
*/
_isBlacklisted (ns) {
return _.includes(this.blacklist, ns)
}
}
module.exports = WebSocketClient