stream.js 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. 'use strict';
  2. const fs = require('fs');
  3. const path = require('path');
  4. const stream = require('stream');
  5. const tar = require('tar-stream');
  6. const utils = require('../utils');
  7. const BaseStream = require('../base_stream');
  8. class TarStream extends BaseStream {
  9. constructor(opts) {
  10. super(opts);
  11. this._waitingEntries = [];
  12. this._processing = false;
  13. this._init(opts);
  14. }
  15. _init() {
  16. const pack = this._pack = tar.pack();
  17. pack.on('end', () => this.push(null));
  18. pack.on('data', chunk => this.push(chunk));
  19. pack.on('error', err => this.emit('error', err));
  20. }
  21. addEntry(entry, opts) {
  22. if (this._processing) {
  23. return this._waitingEntries.push([ entry, opts ]);
  24. }
  25. opts = opts || {};
  26. this._processing = true;
  27. const entryType = utils.entryType(entry);
  28. if (!entryType) return; // TODO
  29. if (entryType === 'fileOrDir') {
  30. this._addFileOrDirEntry(entry, opts);
  31. } else if (entryType === 'buffer') {
  32. this._addBufferEntry(entry, opts);
  33. } else { // stream
  34. this._addStreamEntry(entry, opts);
  35. }
  36. }
  37. _addFileOrDirEntry(entry, opts) {
  38. fs.stat(entry, (err, stat) => {
  39. if (err) return this.emit('error', err);
  40. if (stat.isDirectory()) return this._addDirEntry(entry, opts);
  41. if (stat.isFile()) return this._addFileEntry(entry, opts);
  42. const illigalEntryError = new Error('Type is not supported, must be a file path, directory path, file buffer, or a readable stream');
  43. illigalEntryError.name = 'IlligalEntryError';
  44. this.emit('error', illigalEntryError);
  45. });
  46. }
  47. _addFileEntry(entry, opts) {
  48. // stat file to get file size
  49. fs.stat(entry, (err, stat) => {
  50. if (err) return this.emit('error', err);
  51. const entryStream = this._pack.entry({ name: opts.relativePath || path.basename(entry), size: stat.size, mode: stat.mode & 0o777 }, this._onEntryFinish.bind(this));
  52. const stream = fs.createReadStream(entry, opts.fs);
  53. stream.on('error', err => this.emit('error', err));
  54. stream.pipe(entryStream);
  55. });
  56. }
  57. _addDirEntry(entry, opts) {
  58. fs.readdir(entry, (err, files) => {
  59. if (err) return this.emit('error', err);
  60. const relativePath = opts.relativePath || '';
  61. files.forEach(fileOrDir => {
  62. const newOpts = utils.clone(opts);
  63. if (opts.ignoreBase) {
  64. newOpts.relativePath = path.posix.join(relativePath, fileOrDir);
  65. } else {
  66. newOpts.relativePath = path.posix.join(relativePath, path.basename(entry), fileOrDir);
  67. }
  68. newOpts.ignoreBase = true;
  69. this.addEntry(path.posix.join(entry, fileOrDir), newOpts);
  70. });
  71. this._onEntryFinish();
  72. });
  73. }
  74. _addBufferEntry(entry, opts) {
  75. if (!opts.relativePath) return this.emit('error', 'opts.relativePath is required if entry is a buffer');
  76. this._pack.entry({ name: opts.relativePath }, entry, this._onEntryFinish.bind(this));
  77. }
  78. _addStreamEntry(entry, opts) {
  79. entry.on('error', err => this.emit('error', err));
  80. if (!opts.relativePath) return this.emit('error', new Error('opts.relativePath is required'));
  81. if (opts.size) {
  82. const entryStream = this._pack.entry({ name: opts.relativePath, size: opts.size }, this._onEntryFinish.bind(this));
  83. entry.pipe(entryStream);
  84. } else {
  85. if (!opts.suppressSizeWarning) {
  86. console.warn('You should specify the size of streamming data by opts.size to prevent all streaming data from loading into memory. If you are sure about memory cost, pass opts.suppressSizeWarning: true to suppress this warning');
  87. }
  88. const buf = [];
  89. const collectStream = new stream.Writable({
  90. write(chunk, _, callback) {
  91. buf.push(chunk);
  92. callback();
  93. },
  94. });
  95. collectStream.on('error', err => this.emit('error', err));
  96. collectStream.on('finish', () => {
  97. this._pack.entry({ name: opts.relativePath }, Buffer.concat(buf), this._onEntryFinish.bind(this));
  98. });
  99. entry.pipe(collectStream);
  100. }
  101. }
  102. _read() {}
  103. _onEntryFinish(err) {
  104. if (err) return this.emit('error', err);
  105. this._processing = false;
  106. const waitingEntry = this._waitingEntries.shift();
  107. if (waitingEntry) {
  108. this.addEntry.apply(this, waitingEntry);
  109. } else {
  110. this._finalize();
  111. }
  112. }
  113. _finalize() {
  114. this._pack.finalize();
  115. }
  116. }
  117. module.exports = TarStream;