mirror of
https://github.com/DanielnetoDotCom/YouPHPTube
synced 2025-10-03 17:59:55 +02:00
259 lines
6.6 KiB
JavaScript
259 lines
6.6 KiB
JavaScript
/*! simple-websocket. MIT License. Feross Aboukhadijeh <https://feross.org/opensource> */
|
|
/* global WebSocket */
|
|
|
|
const debug = require('debug')('simple-websocket')
|
|
const randombytes = require('randombytes')
|
|
const stream = require('readable-stream')
|
|
const queueMicrotask = require('queue-microtask') // TODO: remove when Node 10 is not supported
|
|
const ws = require('ws') // websockets in node - will be empty object in browser
|
|
|
|
const _WebSocket = typeof ws !== 'function' ? WebSocket : ws
|
|
|
|
const MAX_BUFFERED_AMOUNT = 64 * 1024
|
|
|
|
/**
|
|
* WebSocket. Same API as node core `net.Socket`. Duplex stream.
|
|
* @param {Object} opts
|
|
* @param {string=} opts.url websocket server url
|
|
* @param {string=} opts.socket raw websocket instance to wrap
|
|
*/
|
|
class Socket extends stream.Duplex {
|
|
constructor (opts = {}) {
|
|
// Support simple usage: `new Socket(url)`
|
|
if (typeof opts === 'string') {
|
|
opts = { url: opts }
|
|
}
|
|
|
|
opts = Object.assign({
|
|
allowHalfOpen: false
|
|
}, opts)
|
|
|
|
super(opts)
|
|
|
|
if (opts.url == null && opts.socket == null) {
|
|
throw new Error('Missing required `url` or `socket` option')
|
|
}
|
|
if (opts.url != null && opts.socket != null) {
|
|
throw new Error('Must specify either `url` or `socket` option, not both')
|
|
}
|
|
|
|
this._id = randombytes(4).toString('hex').slice(0, 7)
|
|
this._debug('new websocket: %o', opts)
|
|
|
|
this.connected = false
|
|
this.destroyed = false
|
|
|
|
this._chunk = null
|
|
this._cb = null
|
|
this._interval = null
|
|
|
|
if (opts.socket) {
|
|
this.url = opts.socket.url
|
|
this._ws = opts.socket
|
|
this.connected = opts.socket.readyState === _WebSocket.OPEN
|
|
} else {
|
|
this.url = opts.url
|
|
try {
|
|
if (typeof ws === 'function') {
|
|
// `ws` package accepts options
|
|
this._ws = new _WebSocket(opts.url, null, {
|
|
...opts,
|
|
encoding: undefined // encoding option breaks ws internals
|
|
})
|
|
} else {
|
|
this._ws = new _WebSocket(opts.url)
|
|
}
|
|
} catch (err) {
|
|
queueMicrotask(() => this.destroy(err))
|
|
return
|
|
}
|
|
}
|
|
|
|
this._ws.binaryType = 'arraybuffer'
|
|
|
|
if (opts.socket && this.connected) {
|
|
queueMicrotask(() => this._handleOpen())
|
|
} else {
|
|
this._ws.onopen = () => this._handleOpen()
|
|
}
|
|
|
|
this._ws.onmessage = event => this._handleMessage(event)
|
|
this._ws.onclose = () => this._handleClose()
|
|
this._ws.onerror = err => this._handleError(err)
|
|
|
|
this._handleFinishBound = () => this._handleFinish()
|
|
this.once('finish', this._handleFinishBound)
|
|
}
|
|
|
|
/**
|
|
* Send text/binary data to the WebSocket server.
|
|
* @param {TypedArrayView|ArrayBuffer|Buffer|string|Blob|Object} chunk
|
|
*/
|
|
send (chunk) {
|
|
this._ws.send(chunk)
|
|
}
|
|
|
|
// TODO: Delete this method once readable-stream is updated to contain a default
|
|
// implementation of destroy() that automatically calls _destroy()
|
|
// See: https://github.com/nodejs/readable-stream/issues/283
|
|
destroy (err) {
|
|
this._destroy(err, () => {})
|
|
}
|
|
|
|
_destroy (err, cb) {
|
|
if (this.destroyed) return
|
|
|
|
this._debug('destroy (error: %s)', err && (err.message || err))
|
|
|
|
this.readable = this.writable = false
|
|
if (!this._readableState.ended) this.push(null)
|
|
if (!this._writableState.finished) this.end()
|
|
|
|
this.connected = false
|
|
this.destroyed = true
|
|
|
|
clearInterval(this._interval)
|
|
this._interval = null
|
|
this._chunk = null
|
|
this._cb = null
|
|
|
|
if (this._handleFinishBound) {
|
|
this.removeListener('finish', this._handleFinishBound)
|
|
}
|
|
this._handleFinishBound = null
|
|
|
|
if (this._ws) {
|
|
const ws = this._ws
|
|
const onClose = () => {
|
|
ws.onclose = null
|
|
}
|
|
if (ws.readyState === _WebSocket.CLOSED) {
|
|
onClose()
|
|
} else {
|
|
try {
|
|
ws.onclose = onClose
|
|
ws.close()
|
|
} catch (err) {
|
|
onClose()
|
|
}
|
|
}
|
|
|
|
ws.onopen = null
|
|
ws.onmessage = null
|
|
ws.onerror = () => {}
|
|
}
|
|
this._ws = null
|
|
|
|
if (err) this.emit('error', err)
|
|
this.emit('close')
|
|
cb()
|
|
}
|
|
|
|
_read () {}
|
|
|
|
_write (chunk, encoding, cb) {
|
|
if (this.destroyed) return cb(new Error('cannot write after socket is destroyed'))
|
|
|
|
if (this.connected) {
|
|
try {
|
|
this.send(chunk)
|
|
} catch (err) {
|
|
return this.destroy(err)
|
|
}
|
|
if (typeof ws !== 'function' && this._ws.bufferedAmount > MAX_BUFFERED_AMOUNT) {
|
|
this._debug('start backpressure: bufferedAmount %d', this._ws.bufferedAmount)
|
|
this._cb = cb
|
|
} else {
|
|
cb(null)
|
|
}
|
|
} else {
|
|
this._debug('write before connect')
|
|
this._chunk = chunk
|
|
this._cb = cb
|
|
}
|
|
}
|
|
|
|
_handleOpen () {
|
|
if (this.connected || this.destroyed) return
|
|
this.connected = true
|
|
|
|
if (this._chunk) {
|
|
try {
|
|
this.send(this._chunk)
|
|
} catch (err) {
|
|
return this.destroy(err)
|
|
}
|
|
this._chunk = null
|
|
this._debug('sent chunk from "write before connect"')
|
|
|
|
const cb = this._cb
|
|
this._cb = null
|
|
cb(null)
|
|
}
|
|
|
|
// Backpressure is not implemented in Node.js. The `ws` module has a buggy
|
|
// `bufferedAmount` property. See: https://github.com/websockets/ws/issues/492
|
|
if (typeof ws !== 'function') {
|
|
this._interval = setInterval(() => this._onInterval(), 150)
|
|
if (this._interval.unref) this._interval.unref()
|
|
}
|
|
|
|
this._debug('connect')
|
|
this.emit('connect')
|
|
}
|
|
|
|
_handleMessage (event) {
|
|
if (this.destroyed) return
|
|
let data = event.data
|
|
if (data instanceof ArrayBuffer) data = Buffer.from(data)
|
|
this.push(data)
|
|
}
|
|
|
|
_handleClose () {
|
|
if (this.destroyed) return
|
|
this._debug('on close')
|
|
this.destroy()
|
|
}
|
|
|
|
_handleError (_) {
|
|
this.destroy(new Error(`Error connecting to ${this.url}`))
|
|
}
|
|
|
|
// When stream finishes writing, close socket. Half open connections are not
|
|
// supported.
|
|
_handleFinish () {
|
|
if (this.destroyed) return
|
|
|
|
// Wait a bit before destroying so the socket flushes.
|
|
// TODO: is there a more reliable way to accomplish this?
|
|
const destroySoon = () => {
|
|
setTimeout(() => this.destroy(), 1000)
|
|
}
|
|
|
|
if (this.connected) {
|
|
destroySoon()
|
|
} else {
|
|
this.once('connect', destroySoon)
|
|
}
|
|
}
|
|
|
|
_onInterval () {
|
|
if (!this._cb || !this._ws || this._ws.bufferedAmount > MAX_BUFFERED_AMOUNT) {
|
|
return
|
|
}
|
|
this._debug('ending backpressure: bufferedAmount %d', this._ws.bufferedAmount)
|
|
const cb = this._cb
|
|
this._cb = null
|
|
cb(null)
|
|
}
|
|
|
|
_debug () {
|
|
const args = [].slice.call(arguments)
|
|
args[0] = '[' + this._id + '] ' + args[0]
|
|
debug.apply(null, args)
|
|
}
|
|
}
|
|
|
|
Socket.WEBSOCKET_SUPPORT = !!_WebSocket
|
|
|
|
module.exports = Socket
|