123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- "use strict";
- let util = require("util");
- let Stream = require("stream");
- let ChunkStream = (module.exports = function () {
- Stream.call(this);
- this._buffers = [];
- this._buffered = 0;
- this._reads = [];
- this._paused = false;
- this._encoding = "utf8";
- this.writable = true;
- });
- util.inherits(ChunkStream, Stream);
- ChunkStream.prototype.read = function (length, callback) {
- this._reads.push({
- length: Math.abs(length), // if length < 0 then at most this length
- allowLess: length < 0,
- func: callback,
- });
- process.nextTick(
- function () {
- this._process();
- // its paused and there is not enought data then ask for more
- if (this._paused && this._reads && this._reads.length > 0) {
- this._paused = false;
- this.emit("drain");
- }
- }.bind(this)
- );
- };
- ChunkStream.prototype.write = function (data, encoding) {
- if (!this.writable) {
- this.emit("error", new Error("Stream not writable"));
- return false;
- }
- let dataBuffer;
- if (Buffer.isBuffer(data)) {
- dataBuffer = data;
- } else {
- dataBuffer = Buffer.from(data, encoding || this._encoding);
- }
- this._buffers.push(dataBuffer);
- this._buffered += dataBuffer.length;
- this._process();
- // ok if there are no more read requests
- if (this._reads && this._reads.length === 0) {
- this._paused = true;
- }
- return this.writable && !this._paused;
- };
- ChunkStream.prototype.end = function (data, encoding) {
- if (data) {
- this.write(data, encoding);
- }
- this.writable = false;
- // already destroyed
- if (!this._buffers) {
- return;
- }
- // enqueue or handle end
- if (this._buffers.length === 0) {
- this._end();
- } else {
- this._buffers.push(null);
- this._process();
- }
- };
- ChunkStream.prototype.destroySoon = ChunkStream.prototype.end;
- ChunkStream.prototype._end = function () {
- if (this._reads.length > 0) {
- this.emit("error", new Error("Unexpected end of input"));
- }
- this.destroy();
- };
- ChunkStream.prototype.destroy = function () {
- if (!this._buffers) {
- return;
- }
- this.writable = false;
- this._reads = null;
- this._buffers = null;
- this.emit("close");
- };
- ChunkStream.prototype._processReadAllowingLess = function (read) {
- // ok there is any data so that we can satisfy this request
- this._reads.shift(); // == read
- // first we need to peek into first buffer
- let smallerBuf = this._buffers[0];
- // ok there is more data than we need
- if (smallerBuf.length > read.length) {
- this._buffered -= read.length;
- this._buffers[0] = smallerBuf.slice(read.length);
- read.func.call(this, smallerBuf.slice(0, read.length));
- } else {
- // ok this is less than maximum length so use it all
- this._buffered -= smallerBuf.length;
- this._buffers.shift(); // == smallerBuf
- read.func.call(this, smallerBuf);
- }
- };
- ChunkStream.prototype._processRead = function (read) {
- this._reads.shift(); // == read
- let pos = 0;
- let count = 0;
- let data = Buffer.alloc(read.length);
- // create buffer for all data
- while (pos < read.length) {
- let buf = this._buffers[count++];
- let len = Math.min(buf.length, read.length - pos);
- buf.copy(data, pos, 0, len);
- pos += len;
- // last buffer wasn't used all so just slice it and leave
- if (len !== buf.length) {
- this._buffers[--count] = buf.slice(len);
- }
- }
- // remove all used buffers
- if (count > 0) {
- this._buffers.splice(0, count);
- }
- this._buffered -= read.length;
- read.func.call(this, data);
- };
- ChunkStream.prototype._process = function () {
- try {
- // as long as there is any data and read requests
- while (this._buffered > 0 && this._reads && this._reads.length > 0) {
- let read = this._reads[0];
- // read any data (but no more than length)
- if (read.allowLess) {
- this._processReadAllowingLess(read);
- } else if (this._buffered >= read.length) {
- // ok we can meet some expectations
- this._processRead(read);
- } else {
- // not enought data to satisfy first request in queue
- // so we need to wait for more
- break;
- }
- }
- if (this._buffers && !this.writable) {
- this._end();
- }
- } catch (ex) {
- this.emit("error", ex);
- }
- };
|