From 1c611232f534a670a7376bd70a085bb3b9092d6f Mon Sep 17 00:00:00 2001 From: Simo Kinnunen Date: Mon, 31 Mar 2014 15:21:36 +0900 Subject: [PATCH] Add utility for reading varint32-delimited streams. --- lib/wire/messagestream.js | 52 ++++++++++++++++++++++++++++++++++++ test/wire/messagestream.js | 54 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 lib/wire/messagestream.js create mode 100644 test/wire/messagestream.js diff --git a/lib/wire/messagestream.js b/lib/wire/messagestream.js new file mode 100644 index 00000000..043f8203 --- /dev/null +++ b/lib/wire/messagestream.js @@ -0,0 +1,52 @@ +var util = require('util') +var stream = require('stream') + +function MessageStream() { + stream.Transform.call(this) + this._length = 0 + this._lengthIndex = 0 + this._readingLength = true + this._buffer = new Buffer(0) +} + +util.inherits(MessageStream, stream.Transform) + +MessageStream.prototype._transform = function(chunk, encoding, done) { + this._buffer = Buffer.concat([this._buffer, chunk]) + + var lo = 0 + var hi = this._buffer.length + + while (lo < hi) { + if (this._readingLength) { + while (lo < hi) { + var byte = this._buffer[lo++] + if (byte & (1 << 7)) { + this._length += (byte & 0x7f) << (7 * this._lengthIndex) + this._lengthIndex += 1 + this._readingLength = true + } + else { + this._length += (byte & 0x7f) << (7 * this._lengthIndex) + this._lengthIndex = 0 + this._readingLength = false + break + } + } + } + + if (!this._readingLength && lo + this._length <= hi) { + this.push(chunk.slice(lo, lo + this._length)) + lo += this._length + this._length = 0 + this._readingLength = true + } + else { + break + } + } + + done() +} + +module.exports = MessageStream diff --git a/test/wire/messagestream.js b/test/wire/messagestream.js new file mode 100644 index 00000000..ba074c7b --- /dev/null +++ b/test/wire/messagestream.js @@ -0,0 +1,54 @@ +var sinon = require('sinon') +var chai = require('chai') +chai.use(require('sinon-chai')) +var expect = chai.expect + +var MessageStream = require('../../lib/wire/messagestream') + +describe('MessageStream', function() { + it('should emit complete varint32-delimited chunks', function() { + var ms = new MessageStream() + var spy = sinon.spy() + ms.on('data', spy) + ms.write(new Buffer([1, 0x61, 2, 0x62, 0x63])) + expect(spy).to.have.been.calledTwice + expect(spy.firstCall.args).to.eql([new Buffer([0x61])]) + expect(spy.secondCall.args).to.eql([new Buffer([0x62, 0x63])]) + }) + + it('should wait for more data', function() { + var ms = new MessageStream() + var spy = sinon.spy() + ms.on('data', spy) + ms.write(new Buffer([1])) + expect(spy).to.not.have.been.called + ms.write(new Buffer([0x66])) + expect(spy).to.have.been.calledOnce + expect(spy.firstCall.args).to.eql([new Buffer([0x66])]) + }) + + it('should read varint32 properly', function() { + var ms = new MessageStream + var spy = sinon.spy() + ms.on('data', spy) + ms.write(new Buffer([172, 2])) // 300 + var data = new Buffer(300) + data.fill(0) + ms.write(data) + expect(spy).to.have.been.calledOnce + expect(spy.firstCall.args).to.eql([data]) + }) + + it('should emit "end"', function(done) { + var ms = new MessageStream() + var spy = sinon.spy() + ms.on('data', sinon.spy()) + ms.on('end', spy) + ms.write(new Buffer([1])) + ms.end() + setImmediate(function() { + expect(spy).to.have.been.called + done() + }) + }) +})