mirror of
https://github.com/openstf/stf
synced 2025-10-05 19:42:01 +02:00
Don't read new frames before the previous frame has been sent.
This commit is contained in:
parent
07fb3b530c
commit
feb77c4396
3 changed files with 97 additions and 48 deletions
|
@ -33,7 +33,10 @@ module.exports = syrup.serial()
|
|||
this.output = null
|
||||
this.socket = null
|
||||
this.banner = null
|
||||
this.parser = null
|
||||
this.frameConfig = config
|
||||
this.readable = false
|
||||
this.needsReadable = false
|
||||
}
|
||||
|
||||
util.inherits(FrameProducer, EventEmitter)
|
||||
|
@ -62,13 +65,14 @@ module.exports = syrup.serial()
|
|||
return this._connectService()
|
||||
})
|
||||
.then(function(socket) {
|
||||
this.parser = new FrameParser()
|
||||
this.socket = new RiskyStream(socket)
|
||||
.on('unexpectedEnd', this._socketEnded.bind(this))
|
||||
return this._readBanner(this.socket.stream)
|
||||
})
|
||||
.then(function(banner) {
|
||||
this.banner = banner
|
||||
return this._readFrames()
|
||||
return this._readFrames(this.socket.stream)
|
||||
})
|
||||
.then(function() {
|
||||
this.runningState = FrameProducer.STATE_STARTED
|
||||
|
@ -107,6 +111,7 @@ module.exports = syrup.serial()
|
|||
this.output = null
|
||||
this.socket = null
|
||||
this.banner = null
|
||||
this.parser = null
|
||||
this._ensureState()
|
||||
})
|
||||
}
|
||||
|
@ -162,6 +167,29 @@ module.exports = syrup.serial()
|
|||
this._configChanged()
|
||||
}
|
||||
|
||||
FrameProducer.prototype.nextFrame = function() {
|
||||
var frame = null, chunk
|
||||
|
||||
if (this.parser) {
|
||||
while ((frame = this.parser.nextFrame()) === null) {
|
||||
if ((chunk = this.socket.stream.read())) {
|
||||
this.parser.push(chunk)
|
||||
}
|
||||
else {
|
||||
this.readable = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return frame
|
||||
}
|
||||
|
||||
FrameProducer.prototype.needFrame = function() {
|
||||
this.needsReadable = true
|
||||
this._maybeEmitReadable()
|
||||
}
|
||||
|
||||
FrameProducer.prototype._configChanged = function() {
|
||||
this.restart()
|
||||
}
|
||||
|
@ -302,23 +330,25 @@ module.exports = syrup.serial()
|
|||
.catch(forceEnd)
|
||||
}
|
||||
|
||||
FrameProducer.prototype._readBanner = function(out) {
|
||||
FrameProducer.prototype._readBanner = function(socket) {
|
||||
log.info('Reading minicap banner')
|
||||
return bannerutil.read(out).timeout(2000)
|
||||
return bannerutil.read(socket).timeout(2000)
|
||||
}
|
||||
|
||||
FrameProducer.prototype._readFrames = function() {
|
||||
var parser = this.socket.stream.pipe(new FrameParser())
|
||||
var emit = this.emit.bind(this)
|
||||
FrameProducer.prototype._readFrames = function(socket) {
|
||||
this.needsReadable = true
|
||||
|
||||
function tryRead() {
|
||||
for (var frame; (frame = parser.read());) {
|
||||
emit('frame', frame)
|
||||
}
|
||||
socket.on('readable', function() {
|
||||
this.readable = true
|
||||
this._maybeEmitReadable()
|
||||
}.bind(this))
|
||||
}
|
||||
|
||||
FrameProducer.prototype._maybeEmitReadable = function() {
|
||||
if (this.readable && this.needsReadable) {
|
||||
this.needsReadable = false
|
||||
this.emit('readable')
|
||||
}
|
||||
|
||||
tryRead()
|
||||
parser.on('readable', tryRead)
|
||||
}
|
||||
|
||||
function createServer() {
|
||||
|
@ -366,12 +396,21 @@ module.exports = syrup.serial()
|
|||
frameProducer.updateRotation(newRotation)
|
||||
})
|
||||
|
||||
frameProducer.on('frame', function(frame) {
|
||||
broadcastSet.each(function(ws) {
|
||||
ws.send(frame, {
|
||||
binary: true
|
||||
})
|
||||
})
|
||||
frameProducer.on('readable', function next() {
|
||||
console.log('NEXT')
|
||||
var frame
|
||||
if ((frame = frameProducer.nextFrame())) {
|
||||
Promise.settle([broadcastSet.values().map(function(ws) {
|
||||
return new Promise(function(resolve/*, reject*/) {
|
||||
ws.send(frame, {
|
||||
binary: true
|
||||
}, resolve)
|
||||
})
|
||||
})]).then(next)
|
||||
}
|
||||
else {
|
||||
frameProducer.needFrame()
|
||||
}
|
||||
})
|
||||
|
||||
frameProducer.on('error', function(err) {
|
||||
|
|
|
@ -31,9 +31,9 @@ BroadcastSet.prototype.remove = function(id) {
|
|||
}
|
||||
}
|
||||
|
||||
BroadcastSet.prototype.each = function(fn) {
|
||||
return Object.keys(this.set).forEach(function(id) {
|
||||
return fn(this.set[id])
|
||||
BroadcastSet.prototype.values = function() {
|
||||
return Object.keys(this.set).map(function(id) {
|
||||
return this.set[id]
|
||||
}, this)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,56 +1,66 @@
|
|||
var stream = require('stream')
|
||||
var util = require('util')
|
||||
|
||||
function FrameParser() {
|
||||
this.readFrameBytes = 0
|
||||
this.frameBodyLength = 0
|
||||
this.frameBody = new Buffer(0)
|
||||
stream.Transform.call(this)
|
||||
this.frameBody = null
|
||||
this.cursor = 0
|
||||
this.chunk = null
|
||||
}
|
||||
|
||||
util.inherits(FrameParser, stream.Transform)
|
||||
FrameParser.prototype.push = function(chunk) {
|
||||
if (this.chunk) {
|
||||
throw new Error('Must consume pending frames before pushing more chunks')
|
||||
}
|
||||
|
||||
FrameParser.prototype._transform = function(chunk, encoding, done) {
|
||||
var cursor, len, bytesLeft
|
||||
this.chunk = chunk
|
||||
}
|
||||
|
||||
for (cursor = 0, len = chunk.length; cursor < len;) {
|
||||
FrameParser.prototype.nextFrame = function() {
|
||||
if (!this.chunk) {
|
||||
return null
|
||||
}
|
||||
|
||||
for (var len = this.chunk.length; this.cursor < len;) {
|
||||
if (this.readFrameBytes < 4) {
|
||||
this.frameBodyLength +=
|
||||
(chunk[cursor] << (this.readFrameBytes * 8)) >>> 0
|
||||
cursor += 1
|
||||
(this.chunk[this.cursor] << (this.readFrameBytes * 8)) >>> 0
|
||||
this.cursor += 1
|
||||
this.readFrameBytes += 1
|
||||
}
|
||||
else {
|
||||
bytesLeft = len - cursor
|
||||
var bytesLeft = len - this.cursor
|
||||
|
||||
if (bytesLeft >= this.frameBodyLength) {
|
||||
this.frameBody = Buffer.concat([
|
||||
this.frameBody
|
||||
, chunk.slice(cursor, cursor + this.frameBodyLength)
|
||||
])
|
||||
var completeBody = this.frameBody
|
||||
? Buffer.concat([
|
||||
this.frameBody
|
||||
, this.chunk.slice(this.cursor, this.cursor + this.frameBodyLength)
|
||||
])
|
||||
: this.chunk.slice(this.cursor, this.cursor + this.frameBodyLength)
|
||||
|
||||
this.push(this.frameBody)
|
||||
|
||||
cursor += this.frameBodyLength
|
||||
this.cursor += this.frameBodyLength
|
||||
this.frameBodyLength = this.readFrameBytes = 0
|
||||
this.frameBody = new Buffer(0)
|
||||
this.frameBody = null
|
||||
|
||||
return completeBody
|
||||
}
|
||||
else {
|
||||
// @todo Consider/benchmark continuation frames to prevent
|
||||
// potential Buffer thrashing.
|
||||
this.frameBody = Buffer.concat([
|
||||
this.frameBody
|
||||
, chunk.slice(cursor, len)
|
||||
])
|
||||
this.frameBody = this.frameBody
|
||||
? Buffer.concat([this.frameBody, this.chunk.slice(this.cursor, len)])
|
||||
: this.chunk.slice(this.cursor, len)
|
||||
|
||||
this.frameBodyLength -= bytesLeft
|
||||
this.readFrameBytes += bytesLeft
|
||||
cursor = len
|
||||
this.cursor = len
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return done()
|
||||
this.cursor = 0
|
||||
this.chunk = null
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
module.exports = FrameParser
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue