diff --git a/lib/wire/messagestream.js b/lib/wire/messagestream.js index 043f8203..08588e58 100644 --- a/lib/wire/messagestream.js +++ b/lib/wire/messagestream.js @@ -1,7 +1,7 @@ var util = require('util') var stream = require('stream') -function MessageStream() { +function DelimitedStream() { stream.Transform.call(this) this._length = 0 this._lengthIndex = 0 @@ -9,9 +9,9 @@ function MessageStream() { this._buffer = new Buffer(0) } -util.inherits(MessageStream, stream.Transform) +util.inherits(DelimitedStream, stream.Transform) -MessageStream.prototype._transform = function(chunk, encoding, done) { +DelimitedStream.prototype._transform = function(chunk, encoding, done) { this._buffer = Buffer.concat([this._buffer, chunk]) var lo = 0 @@ -49,4 +49,34 @@ MessageStream.prototype._transform = function(chunk, encoding, done) { done() } -module.exports = MessageStream +module.exports.DelimitedStream = DelimitedStream + +function DelimitingStream() { + stream.Transform.call(this) +} + +util.inherits(DelimitingStream, stream.Transform) + +DelimitingStream.prototype._transform = function(chunk, encoding, done) { + var length = chunk.length + var lengthBytes = [] + var more = true + + while (more) { + if (length > 0x7f) { + lengthBytes.push((1 << 7) + (length & 0x7f)) + length >>= 7 + } + else { + lengthBytes.push(length) + more = false + } + } + + this.push(new Buffer(lengthBytes)) + this.push(chunk) + + done() +} + +module.exports.DelimitingStream = DelimitingStream diff --git a/test/wire/messagestream.js b/test/wire/messagestream.js index ba074c7b..265ebc76 100644 --- a/test/wire/messagestream.js +++ b/test/wire/messagestream.js @@ -3,52 +3,91 @@ var chai = require('chai') chai.use(require('sinon-chai')) var expect = chai.expect -var MessageStream = require('../../lib/wire/messagestream') +var ms = require('../../lib/wire/messagestream') describe('MessageStream', function() { - it('should emit complete varint32-delimited chunks', function() { - var ms = new MessageStream() - var spy = sinon.spy() - ms.on('data', spy) - ms.write(new Buffer([1, 0x61, 2, 0x62, 0x63])) - expect(spy).to.have.been.calledTwice - expect(spy.firstCall.args).to.eql([new Buffer([0x61])]) - expect(spy.secondCall.args).to.eql([new Buffer([0x62, 0x63])]) + describe('DelimitedStream', function() { + it('should emit complete varint-delimited chunks', function() { + var ds = new ms.DelimitedStream() + var spy = sinon.spy() + ds.on('data', spy) + ds.write(new Buffer([1, 0x61, 2, 0x62, 0x63])) + expect(spy).to.have.been.calledTwice + expect(spy.firstCall.args).to.eql([new Buffer([0x61])]) + expect(spy.secondCall.args).to.eql([new Buffer([0x62, 0x63])]) + }) + + it('should wait for more data', function() { + var ds = new ms.DelimitedStream() + var spy = sinon.spy() + ds.on('data', spy) + ds.write(new Buffer([1])) + expect(spy).to.not.have.been.called + ds.write(new Buffer([0x66])) + expect(spy).to.have.been.calledOnce + expect(spy.firstCall.args).to.eql([new Buffer([0x66])]) + }) + + it('should read varint32 properly', function() { + var ds = new ms.DelimitedStream() + var spy = sinon.spy() + ds.on('data', spy) + ds.write(new Buffer([172, 2])) // 300 + var data = new Buffer(300) + data.fill(0) + ds.write(data) + expect(spy).to.have.been.calledOnce + expect(spy.firstCall.args).to.eql([data]) + }) + + it('should emit "end"', function(done) { + var ds = new ms.DelimitedStream() + var spy = sinon.spy() + ds.on('data', sinon.spy()) + ds.on('end', spy) + ds.write(new Buffer([1])) + ds.end() + setImmediate(function() { + expect(spy).to.have.been.called + done() + }) + }) }) - it('should wait for more data', function() { - var ms = new MessageStream() - var spy = sinon.spy() - ms.on('data', spy) - ms.write(new Buffer([1])) - expect(spy).to.not.have.been.called - ms.write(new Buffer([0x66])) - expect(spy).to.have.been.calledOnce - expect(spy.firstCall.args).to.eql([new Buffer([0x66])]) - }) + describe('DelimitingStream', function() { + it('should add delimiter chunks to stream', function() { + var ds = new ms.DelimitingStream() + var spy = sinon.spy() + ds.on('data', spy) + ds.write(new Buffer([0x66, 0x6f, 0x6f])) + expect(spy).to.have.been.calledTwice + expect(spy.firstCall.args).to.eql([new Buffer([0x03])]) + expect(spy.secondCall.args).to.eql([new Buffer([0x66, 0x6f, 0x6f])]) + }) - it('should read varint32 properly', function() { - var ms = new MessageStream - var spy = sinon.spy() - ms.on('data', spy) - ms.write(new Buffer([172, 2])) // 300 - var data = new Buffer(300) - data.fill(0) - ms.write(data) - expect(spy).to.have.been.calledOnce - expect(spy.firstCall.args).to.eql([data]) - }) + it('should write proper varints', function() { + var ds = new ms.DelimitingStream() + var spy = sinon.spy() + ds.on('data', spy) + var data = new Buffer(300) + data.fill(0) + ds.write(data) + expect(spy).to.have.been.calledTwice + expect(spy.firstCall.args).to.eql([new Buffer([172, 2])]) + expect(spy.secondCall.args).to.eql([data]) + }) - it('should emit "end"', function(done) { - var ms = new MessageStream() - var spy = sinon.spy() - ms.on('data', sinon.spy()) - ms.on('end', spy) - ms.write(new Buffer([1])) - ms.end() - setImmediate(function() { - expect(spy).to.have.been.called - done() + it('should emit "end"', function(done) { + var ds = new ms.DelimitingStream() + var spy = sinon.spy() + ds.on('data', sinon.spy()) + ds.on('end', spy) + ds.write(new Buffer([1])) + ds.end() + setImmediate(function() { + expect(spy).to.have.been.called + done() + }) }) }) })