var util = require('util') var stream = require('stream') function DelimitedStream() { stream.Transform.call(this) this._length = 0 this._lengthIndex = 0 this._readingLength = true this._buffer = new Buffer(0) } util.inherits(DelimitedStream, stream.Transform) DelimitedStream.prototype._transform = function(chunk, encoding, done) { this._buffer = Buffer.concat([this._buffer, chunk]) while (this._buffer.length) { if (this._readingLength) { var byte = this._buffer[0] this._length += (byte & 0x7f) << (7 * this._lengthIndex) if (byte & (1 << 7)) { this._lengthIndex += 1 this._readingLength = true } else { this._lengthIndex = 0 this._readingLength = false } this._buffer = this._buffer.slice(1) } else { if (this._length <= this._buffer.length) { this.push(this._buffer.slice(0, this._length)) this._buffer = this._buffer.slice(this._length) this._length = 0 this._readingLength = true } else { // Wait for more chunks break } } } done() } module.exports.DelimitedStream = DelimitedStream function DelimitingStream() { stream.Transform.call(this) } util.inherits(DelimitingStream, stream.Transform) DelimitingStream.prototype._transform = function(chunk, encoding, done) { var length = chunk.length var lengthBytes = [] while (length > 0x7f) { lengthBytes.push((1 << 7) + (length & 0x7f)) length >>= 7 } lengthBytes.push(length) this.push(new Buffer(lengthBytes)) this.push(chunk) done() } module.exports.DelimitingStream = DelimitingStream