1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-05 10:39:25 +02:00

Add a varint-delimiting stream as well.

This commit is contained in:
Simo Kinnunen 2014-03-31 15:59:02 +09:00
parent 1c611232f5
commit 514fc3554d
2 changed files with 113 additions and 44 deletions

View file

@ -1,7 +1,7 @@
var util = require('util') var util = require('util')
var stream = require('stream') var stream = require('stream')
function MessageStream() { function DelimitedStream() {
stream.Transform.call(this) stream.Transform.call(this)
this._length = 0 this._length = 0
this._lengthIndex = 0 this._lengthIndex = 0
@ -9,9 +9,9 @@ function MessageStream() {
this._buffer = new Buffer(0) this._buffer = new Buffer(0)
} }
util.inherits(MessageStream, stream.Transform) util.inherits(DelimitedStream, stream.Transform)
MessageStream.prototype._transform = function(chunk, encoding, done) { DelimitedStream.prototype._transform = function(chunk, encoding, done) {
this._buffer = Buffer.concat([this._buffer, chunk]) this._buffer = Buffer.concat([this._buffer, chunk])
var lo = 0 var lo = 0
@ -49,4 +49,34 @@ MessageStream.prototype._transform = function(chunk, encoding, done) {
done() done()
} }
module.exports = MessageStream module.exports.DelimitedStream = DelimitedStream
function DelimitingStream() {
stream.Transform.call(this)
}
util.inherits(DelimitingStream, stream.Transform)
DelimitingStream.prototype._transform = function(chunk, encoding, done) {
var length = chunk.length
var lengthBytes = []
var more = true
while (more) {
if (length > 0x7f) {
lengthBytes.push((1 << 7) + (length & 0x7f))
length >>= 7
}
else {
lengthBytes.push(length)
more = false
}
}
this.push(new Buffer(lengthBytes))
this.push(chunk)
done()
}
module.exports.DelimitingStream = DelimitingStream

View file

@ -3,52 +3,91 @@ var chai = require('chai')
chai.use(require('sinon-chai')) chai.use(require('sinon-chai'))
var expect = chai.expect var expect = chai.expect
var MessageStream = require('../../lib/wire/messagestream') var ms = require('../../lib/wire/messagestream')
describe('MessageStream', function() { describe('MessageStream', function() {
it('should emit complete varint32-delimited chunks', function() { describe('DelimitedStream', function() {
var ms = new MessageStream() it('should emit complete varint-delimited chunks', function() {
var spy = sinon.spy() var ds = new ms.DelimitedStream()
ms.on('data', spy) var spy = sinon.spy()
ms.write(new Buffer([1, 0x61, 2, 0x62, 0x63])) ds.on('data', spy)
expect(spy).to.have.been.calledTwice ds.write(new Buffer([1, 0x61, 2, 0x62, 0x63]))
expect(spy.firstCall.args).to.eql([new Buffer([0x61])]) expect(spy).to.have.been.calledTwice
expect(spy.secondCall.args).to.eql([new Buffer([0x62, 0x63])]) expect(spy.firstCall.args).to.eql([new Buffer([0x61])])
expect(spy.secondCall.args).to.eql([new Buffer([0x62, 0x63])])
})
it('should wait for more data', function() {
var ds = new ms.DelimitedStream()
var spy = sinon.spy()
ds.on('data', spy)
ds.write(new Buffer([1]))
expect(spy).to.not.have.been.called
ds.write(new Buffer([0x66]))
expect(spy).to.have.been.calledOnce
expect(spy.firstCall.args).to.eql([new Buffer([0x66])])
})
it('should read varint32 properly', function() {
var ds = new ms.DelimitedStream()
var spy = sinon.spy()
ds.on('data', spy)
ds.write(new Buffer([172, 2])) // 300
var data = new Buffer(300)
data.fill(0)
ds.write(data)
expect(spy).to.have.been.calledOnce
expect(spy.firstCall.args).to.eql([data])
})
it('should emit "end"', function(done) {
var ds = new ms.DelimitedStream()
var spy = sinon.spy()
ds.on('data', sinon.spy())
ds.on('end', spy)
ds.write(new Buffer([1]))
ds.end()
setImmediate(function() {
expect(spy).to.have.been.called
done()
})
})
}) })
it('should wait for more data', function() { describe('DelimitingStream', function() {
var ms = new MessageStream() it('should add delimiter chunks to stream', function() {
var spy = sinon.spy() var ds = new ms.DelimitingStream()
ms.on('data', spy) var spy = sinon.spy()
ms.write(new Buffer([1])) ds.on('data', spy)
expect(spy).to.not.have.been.called ds.write(new Buffer([0x66, 0x6f, 0x6f]))
ms.write(new Buffer([0x66])) expect(spy).to.have.been.calledTwice
expect(spy).to.have.been.calledOnce expect(spy.firstCall.args).to.eql([new Buffer([0x03])])
expect(spy.firstCall.args).to.eql([new Buffer([0x66])]) expect(spy.secondCall.args).to.eql([new Buffer([0x66, 0x6f, 0x6f])])
}) })
it('should read varint32 properly', function() { it('should write proper varints', function() {
var ms = new MessageStream var ds = new ms.DelimitingStream()
var spy = sinon.spy() var spy = sinon.spy()
ms.on('data', spy) ds.on('data', spy)
ms.write(new Buffer([172, 2])) // 300 var data = new Buffer(300)
var data = new Buffer(300) data.fill(0)
data.fill(0) ds.write(data)
ms.write(data) expect(spy).to.have.been.calledTwice
expect(spy).to.have.been.calledOnce expect(spy.firstCall.args).to.eql([new Buffer([172, 2])])
expect(spy.firstCall.args).to.eql([data]) expect(spy.secondCall.args).to.eql([data])
}) })
it('should emit "end"', function(done) { it('should emit "end"', function(done) {
var ms = new MessageStream() var ds = new ms.DelimitingStream()
var spy = sinon.spy() var spy = sinon.spy()
ms.on('data', sinon.spy()) ds.on('data', sinon.spy())
ms.on('end', spy) ds.on('end', spy)
ms.write(new Buffer([1])) ds.write(new Buffer([1]))
ms.end() ds.end()
setImmediate(function() { setImmediate(function() {
expect(spy).to.have.been.called expect(spy).to.have.been.called
done() done()
})
}) })
}) })
}) })