1
0
Fork 0
mirror of https://github.com/DanielnetoDotCom/YouPHPTube synced 2025-10-03 09:49:28 +02:00
Oinktube/node_modules/simple-websocket/index.js
2023-02-17 15:04:26 -03:00

259 lines
6.6 KiB
JavaScript

/*! simple-websocket. MIT License. Feross Aboukhadijeh <https://feross.org/opensource> */
/* global WebSocket */
const debug = require('debug')('simple-websocket')
const randombytes = require('randombytes')
const stream = require('readable-stream')
const queueMicrotask = require('queue-microtask') // TODO: remove when Node 10 is not supported
const ws = require('ws') // websockets in node - will be empty object in browser
const _WebSocket = typeof ws !== 'function' ? WebSocket : ws
const MAX_BUFFERED_AMOUNT = 64 * 1024
/**
* WebSocket. Same API as node core `net.Socket`. Duplex stream.
* @param {Object} opts
* @param {string=} opts.url websocket server url
* @param {string=} opts.socket raw websocket instance to wrap
*/
class Socket extends stream.Duplex {
constructor (opts = {}) {
// Support simple usage: `new Socket(url)`
if (typeof opts === 'string') {
opts = { url: opts }
}
opts = Object.assign({
allowHalfOpen: false
}, opts)
super(opts)
if (opts.url == null && opts.socket == null) {
throw new Error('Missing required `url` or `socket` option')
}
if (opts.url != null && opts.socket != null) {
throw new Error('Must specify either `url` or `socket` option, not both')
}
this._id = randombytes(4).toString('hex').slice(0, 7)
this._debug('new websocket: %o', opts)
this.connected = false
this.destroyed = false
this._chunk = null
this._cb = null
this._interval = null
if (opts.socket) {
this.url = opts.socket.url
this._ws = opts.socket
this.connected = opts.socket.readyState === _WebSocket.OPEN
} else {
this.url = opts.url
try {
if (typeof ws === 'function') {
// `ws` package accepts options
this._ws = new _WebSocket(opts.url, null, {
...opts,
encoding: undefined // encoding option breaks ws internals
})
} else {
this._ws = new _WebSocket(opts.url)
}
} catch (err) {
queueMicrotask(() => this.destroy(err))
return
}
}
this._ws.binaryType = 'arraybuffer'
if (opts.socket && this.connected) {
queueMicrotask(() => this._handleOpen())
} else {
this._ws.onopen = () => this._handleOpen()
}
this._ws.onmessage = event => this._handleMessage(event)
this._ws.onclose = () => this._handleClose()
this._ws.onerror = err => this._handleError(err)
this._handleFinishBound = () => this._handleFinish()
this.once('finish', this._handleFinishBound)
}
/**
* Send text/binary data to the WebSocket server.
* @param {TypedArrayView|ArrayBuffer|Buffer|string|Blob|Object} chunk
*/
send (chunk) {
this._ws.send(chunk)
}
// TODO: Delete this method once readable-stream is updated to contain a default
// implementation of destroy() that automatically calls _destroy()
// See: https://github.com/nodejs/readable-stream/issues/283
destroy (err) {
this._destroy(err, () => {})
}
_destroy (err, cb) {
if (this.destroyed) return
this._debug('destroy (error: %s)', err && (err.message || err))
this.readable = this.writable = false
if (!this._readableState.ended) this.push(null)
if (!this._writableState.finished) this.end()
this.connected = false
this.destroyed = true
clearInterval(this._interval)
this._interval = null
this._chunk = null
this._cb = null
if (this._handleFinishBound) {
this.removeListener('finish', this._handleFinishBound)
}
this._handleFinishBound = null
if (this._ws) {
const ws = this._ws
const onClose = () => {
ws.onclose = null
}
if (ws.readyState === _WebSocket.CLOSED) {
onClose()
} else {
try {
ws.onclose = onClose
ws.close()
} catch (err) {
onClose()
}
}
ws.onopen = null
ws.onmessage = null
ws.onerror = () => {}
}
this._ws = null
if (err) this.emit('error', err)
this.emit('close')
cb()
}
_read () {}
_write (chunk, encoding, cb) {
if (this.destroyed) return cb(new Error('cannot write after socket is destroyed'))
if (this.connected) {
try {
this.send(chunk)
} catch (err) {
return this.destroy(err)
}
if (typeof ws !== 'function' && this._ws.bufferedAmount > MAX_BUFFERED_AMOUNT) {
this._debug('start backpressure: bufferedAmount %d', this._ws.bufferedAmount)
this._cb = cb
} else {
cb(null)
}
} else {
this._debug('write before connect')
this._chunk = chunk
this._cb = cb
}
}
_handleOpen () {
if (this.connected || this.destroyed) return
this.connected = true
if (this._chunk) {
try {
this.send(this._chunk)
} catch (err) {
return this.destroy(err)
}
this._chunk = null
this._debug('sent chunk from "write before connect"')
const cb = this._cb
this._cb = null
cb(null)
}
// Backpressure is not implemented in Node.js. The `ws` module has a buggy
// `bufferedAmount` property. See: https://github.com/websockets/ws/issues/492
if (typeof ws !== 'function') {
this._interval = setInterval(() => this._onInterval(), 150)
if (this._interval.unref) this._interval.unref()
}
this._debug('connect')
this.emit('connect')
}
_handleMessage (event) {
if (this.destroyed) return
let data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
this.push(data)
}
_handleClose () {
if (this.destroyed) return
this._debug('on close')
this.destroy()
}
_handleError (_) {
this.destroy(new Error(`Error connecting to ${this.url}`))
}
// When stream finishes writing, close socket. Half open connections are not
// supported.
_handleFinish () {
if (this.destroyed) return
// Wait a bit before destroying so the socket flushes.
// TODO: is there a more reliable way to accomplish this?
const destroySoon = () => {
setTimeout(() => this.destroy(), 1000)
}
if (this.connected) {
destroySoon()
} else {
this.once('connect', destroySoon)
}
}
_onInterval () {
if (!this._cb || !this._ws || this._ws.bufferedAmount > MAX_BUFFERED_AMOUNT) {
return
}
this._debug('ending backpressure: bufferedAmount %d', this._ws.bufferedAmount)
const cb = this._cb
this._cb = null
cb(null)
}
_debug () {
const args = [].slice.call(arguments)
args[0] = '[' + this._id + '] ' + args[0]
debug.apply(null, args)
}
}
Socket.WEBSOCKET_SUPPORT = !!_WebSocket
module.exports = Socket