chunkstream.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. "use strict";
  2. let util = require("util");
  3. let Stream = require("stream");
  4. let ChunkStream = (module.exports = function () {
  5. Stream.call(this);
  6. this._buffers = [];
  7. this._buffered = 0;
  8. this._reads = [];
  9. this._paused = false;
  10. this._encoding = "utf8";
  11. this.writable = true;
  12. });
  13. util.inherits(ChunkStream, Stream);
  14. ChunkStream.prototype.read = function (length, callback) {
  15. this._reads.push({
  16. length: Math.abs(length), // if length < 0 then at most this length
  17. allowLess: length < 0,
  18. func: callback,
  19. });
  20. process.nextTick(
  21. function () {
  22. this._process();
  23. // its paused and there is not enought data then ask for more
  24. if (this._paused && this._reads && this._reads.length > 0) {
  25. this._paused = false;
  26. this.emit("drain");
  27. }
  28. }.bind(this)
  29. );
  30. };
  31. ChunkStream.prototype.write = function (data, encoding) {
  32. if (!this.writable) {
  33. this.emit("error", new Error("Stream not writable"));
  34. return false;
  35. }
  36. let dataBuffer;
  37. if (Buffer.isBuffer(data)) {
  38. dataBuffer = data;
  39. } else {
  40. dataBuffer = Buffer.from(data, encoding || this._encoding);
  41. }
  42. this._buffers.push(dataBuffer);
  43. this._buffered += dataBuffer.length;
  44. this._process();
  45. // ok if there are no more read requests
  46. if (this._reads && this._reads.length === 0) {
  47. this._paused = true;
  48. }
  49. return this.writable && !this._paused;
  50. };
  51. ChunkStream.prototype.end = function (data, encoding) {
  52. if (data) {
  53. this.write(data, encoding);
  54. }
  55. this.writable = false;
  56. // already destroyed
  57. if (!this._buffers) {
  58. return;
  59. }
  60. // enqueue or handle end
  61. if (this._buffers.length === 0) {
  62. this._end();
  63. } else {
  64. this._buffers.push(null);
  65. this._process();
  66. }
  67. };
  68. ChunkStream.prototype.destroySoon = ChunkStream.prototype.end;
  69. ChunkStream.prototype._end = function () {
  70. if (this._reads.length > 0) {
  71. this.emit("error", new Error("Unexpected end of input"));
  72. }
  73. this.destroy();
  74. };
  75. ChunkStream.prototype.destroy = function () {
  76. if (!this._buffers) {
  77. return;
  78. }
  79. this.writable = false;
  80. this._reads = null;
  81. this._buffers = null;
  82. this.emit("close");
  83. };
  84. ChunkStream.prototype._processReadAllowingLess = function (read) {
  85. // ok there is any data so that we can satisfy this request
  86. this._reads.shift(); // == read
  87. // first we need to peek into first buffer
  88. let smallerBuf = this._buffers[0];
  89. // ok there is more data than we need
  90. if (smallerBuf.length > read.length) {
  91. this._buffered -= read.length;
  92. this._buffers[0] = smallerBuf.slice(read.length);
  93. read.func.call(this, smallerBuf.slice(0, read.length));
  94. } else {
  95. // ok this is less than maximum length so use it all
  96. this._buffered -= smallerBuf.length;
  97. this._buffers.shift(); // == smallerBuf
  98. read.func.call(this, smallerBuf);
  99. }
  100. };
  101. ChunkStream.prototype._processRead = function (read) {
  102. this._reads.shift(); // == read
  103. let pos = 0;
  104. let count = 0;
  105. let data = Buffer.alloc(read.length);
  106. // create buffer for all data
  107. while (pos < read.length) {
  108. let buf = this._buffers[count++];
  109. let len = Math.min(buf.length, read.length - pos);
  110. buf.copy(data, pos, 0, len);
  111. pos += len;
  112. // last buffer wasn't used all so just slice it and leave
  113. if (len !== buf.length) {
  114. this._buffers[--count] = buf.slice(len);
  115. }
  116. }
  117. // remove all used buffers
  118. if (count > 0) {
  119. this._buffers.splice(0, count);
  120. }
  121. this._buffered -= read.length;
  122. read.func.call(this, data);
  123. };
  124. ChunkStream.prototype._process = function () {
  125. try {
  126. // as long as there is any data and read requests
  127. while (this._buffered > 0 && this._reads && this._reads.length > 0) {
  128. let read = this._reads[0];
  129. // read any data (but no more than length)
  130. if (read.allowLess) {
  131. this._processReadAllowingLess(read);
  132. } else if (this._buffered >= read.length) {
  133. // ok we can meet some expectations
  134. this._processRead(read);
  135. } else {
  136. // not enought data to satisfy first request in queue
  137. // so we need to wait for more
  138. break;
  139. }
  140. }
  141. if (this._buffers && !this.writable) {
  142. this._end();
  143. }
  144. } catch (ex) {
  145. this.emit("error", ex);
  146. }
  147. };