/*! simple-websocket. MIT License. Feross Aboukhadijeh */ /* global WebSocket */ const debug = require('debug')('simple-websocket') const randombytes = require('randombytes') const stream = require('readable-stream') const queueMicrotask = require('queue-microtask') // TODO: remove when Node 10 is not supported const ws = require('ws') // websockets in node - will be empty object in browser const _WebSocket = typeof ws !== 'function' ? WebSocket : ws const MAX_BUFFERED_AMOUNT = 64 * 1024 /** * WebSocket. Same API as node core `net.Socket`. Duplex stream. * @param {Object} opts * @param {string=} opts.url websocket server url * @param {string=} opts.socket raw websocket instance to wrap */ class Socket extends stream.Duplex { constructor (opts = {}) { // Support simple usage: `new Socket(url)` if (typeof opts === 'string') { opts = { url: opts } } opts = Object.assign({ allowHalfOpen: false }, opts) super(opts) if (opts.url == null && opts.socket == null) { throw new Error('Missing required `url` or `socket` option') } if (opts.url != null && opts.socket != null) { throw new Error('Must specify either `url` or `socket` option, not both') } this._id = randombytes(4).toString('hex').slice(0, 7) this._debug('new websocket: %o', opts) this.connected = false this.destroyed = false this._chunk = null this._cb = null this._interval = null if (opts.socket) { this.url = opts.socket.url this._ws = opts.socket this.connected = opts.socket.readyState === _WebSocket.OPEN } else { this.url = opts.url try { if (typeof ws === 'function') { // `ws` package accepts options this._ws = new _WebSocket(opts.url, null, { ...opts, encoding: undefined // encoding option breaks ws internals }) } else { this._ws = new _WebSocket(opts.url) } } catch (err) { queueMicrotask(() => this.destroy(err)) return } } this._ws.binaryType = 'arraybuffer' if (opts.socket && this.connected) { queueMicrotask(() => this._handleOpen()) } else { this._ws.onopen = () => this._handleOpen() } this._ws.onmessage = event => this._handleMessage(event) this._ws.onclose = () => this._handleClose() this._ws.onerror = err => this._handleError(err) this._handleFinishBound = () => this._handleFinish() this.once('finish', this._handleFinishBound) } /** * Send text/binary data to the WebSocket server. * @param {TypedArrayView|ArrayBuffer|Buffer|string|Blob|Object} chunk */ send (chunk) { this._ws.send(chunk) } // TODO: Delete this method once readable-stream is updated to contain a default // implementation of destroy() that automatically calls _destroy() // See: https://github.com/nodejs/readable-stream/issues/283 destroy (err) { this._destroy(err, () => {}) } _destroy (err, cb) { if (this.destroyed) return this._debug('destroy (error: %s)', err && (err.message || err)) this.readable = this.writable = false if (!this._readableState.ended) this.push(null) if (!this._writableState.finished) this.end() this.connected = false this.destroyed = true clearInterval(this._interval) this._interval = null this._chunk = null this._cb = null if (this._handleFinishBound) { this.removeListener('finish', this._handleFinishBound) } this._handleFinishBound = null if (this._ws) { const ws = this._ws const onClose = () => { ws.onclose = null } if (ws.readyState === _WebSocket.CLOSED) { onClose() } else { try { ws.onclose = onClose ws.close() } catch (err) { onClose() } } ws.onopen = null ws.onmessage = null ws.onerror = () => {} } this._ws = null if (err) this.emit('error', err) this.emit('close') cb() } _read () {} _write (chunk, encoding, cb) { if (this.destroyed) return cb(new Error('cannot write after socket is destroyed')) if (this.connected) { try { this.send(chunk) } catch (err) { return this.destroy(err) } if (typeof ws !== 'function' && this._ws.bufferedAmount > MAX_BUFFERED_AMOUNT) { this._debug('start backpressure: bufferedAmount %d', this._ws.bufferedAmount) this._cb = cb } else { cb(null) } } else { this._debug('write before connect') this._chunk = chunk this._cb = cb } } _handleOpen () { if (this.connected || this.destroyed) return this.connected = true if (this._chunk) { try { this.send(this._chunk) } catch (err) { return this.destroy(err) } this._chunk = null this._debug('sent chunk from "write before connect"') const cb = this._cb this._cb = null cb(null) } // Backpressure is not implemented in Node.js. The `ws` module has a buggy // `bufferedAmount` property. See: https://github.com/websockets/ws/issues/492 if (typeof ws !== 'function') { this._interval = setInterval(() => this._onInterval(), 150) if (this._interval.unref) this._interval.unref() } this._debug('connect') this.emit('connect') } _handleMessage (event) { if (this.destroyed) return let data = event.data if (data instanceof ArrayBuffer) data = Buffer.from(data) this.push(data) } _handleClose () { if (this.destroyed) return this._debug('on close') this.destroy() } _handleError (_) { this.destroy(new Error(`Error connecting to ${this.url}`)) } // When stream finishes writing, close socket. Half open connections are not // supported. _handleFinish () { if (this.destroyed) return // Wait a bit before destroying so the socket flushes. // TODO: is there a more reliable way to accomplish this? const destroySoon = () => { setTimeout(() => this.destroy(), 1000) } if (this.connected) { destroySoon() } else { this.once('connect', destroySoon) } } _onInterval () { if (!this._cb || !this._ws || this._ws.bufferedAmount > MAX_BUFFERED_AMOUNT) { return } this._debug('ending backpressure: bufferedAmount %d', this._ws.bufferedAmount) const cb = this._cb this._cb = null cb(null) } _debug () { const args = [].slice.call(arguments) args[0] = '[' + this._id + '] ' + args[0] debug.apply(null, args) } } Socket.WEBSOCKET_SUPPORT = !!_WebSocket module.exports = Socket