mirror of
https://github.com/openstf/stf
synced 2025-10-04 10:19:30 +02:00
Fix major oversight in MessageStream that was causing multi-chunk messages to fail.
This commit is contained in:
parent
09eb8c539d
commit
2bf8f2352b
2 changed files with 29 additions and 27 deletions
|
@ -14,35 +14,33 @@ util.inherits(DelimitedStream, stream.Transform)
|
||||||
DelimitedStream.prototype._transform = function(chunk, encoding, done) {
|
DelimitedStream.prototype._transform = function(chunk, encoding, done) {
|
||||||
this._buffer = Buffer.concat([this._buffer, chunk])
|
this._buffer = Buffer.concat([this._buffer, chunk])
|
||||||
|
|
||||||
var lo = 0
|
while (this._buffer.length) {
|
||||||
var hi = this._buffer.length
|
|
||||||
|
|
||||||
while (lo < hi) {
|
|
||||||
if (this._readingLength) {
|
if (this._readingLength) {
|
||||||
while (lo < hi) {
|
var byte = this._buffer[0]
|
||||||
var byte = this._buffer[lo++]
|
if (byte & (1 << 7)) {
|
||||||
if (byte & (1 << 7)) {
|
this._length += (byte & 0x7f) << (7 * this._lengthIndex)
|
||||||
this._length += (byte & 0x7f) << (7 * this._lengthIndex)
|
this._lengthIndex += 1
|
||||||
this._lengthIndex += 1
|
this._readingLength = true
|
||||||
this._readingLength = true
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
this._length += (byte & 0x7f) << (7 * this._lengthIndex)
|
|
||||||
this._lengthIndex = 0
|
|
||||||
this._readingLength = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
this._length += (byte & 0x7f) << (7 * this._lengthIndex)
|
||||||
|
this._lengthIndex = 0
|
||||||
|
this._readingLength = false
|
||||||
|
}
|
||||||
|
this._buffer = this._buffer.slice(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!this._readingLength && lo + this._length <= hi) {
|
if (!this._readingLength) {
|
||||||
this.push(chunk.slice(lo, lo + this._length))
|
if (this._length <= this._buffer.length) {
|
||||||
lo += this._length
|
this.push(this._buffer.slice(0, this._length))
|
||||||
this._length = 0
|
this._buffer = this._buffer.slice(this._length)
|
||||||
this._readingLength = true
|
this._length = 0
|
||||||
}
|
this._readingLength = true
|
||||||
else {
|
}
|
||||||
break
|
else {
|
||||||
|
// Wait for more chunks
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,11 +21,15 @@ describe('MessageStream', function() {
|
||||||
var ds = new ms.DelimitedStream()
|
var ds = new ms.DelimitedStream()
|
||||||
var spy = sinon.spy()
|
var spy = sinon.spy()
|
||||||
ds.on('data', spy)
|
ds.on('data', spy)
|
||||||
ds.write(new Buffer([1]))
|
ds.write(new Buffer([3]))
|
||||||
expect(spy).to.not.have.been.called
|
expect(spy).to.not.have.been.called
|
||||||
ds.write(new Buffer([0x66]))
|
ds.write(new Buffer([0x66]))
|
||||||
|
expect(spy).to.not.have.been.called
|
||||||
|
ds.write(new Buffer([0x65]))
|
||||||
|
expect(spy).to.not.have.been.called
|
||||||
|
ds.write(new Buffer([0x64]))
|
||||||
expect(spy).to.have.been.calledOnce
|
expect(spy).to.have.been.calledOnce
|
||||||
expect(spy.firstCall.args).to.eql([new Buffer([0x66])])
|
expect(spy.firstCall.args).to.eql([new Buffer([0x66, 0x65, 0x64])])
|
||||||
})
|
})
|
||||||
|
|
||||||
it('should read varint32 properly', function() {
|
it('should read varint32 properly', function() {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue