index.js 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. 'use strict';
  2. const pTimeout = require('p-timeout');
  3. const symbolAsyncIterator = Symbol.asyncIterator || '@@asyncIterator';
  4. const normalizeEmitter = emitter => {
  5. const addListener = emitter.on || emitter.addListener || emitter.addEventListener;
  6. const removeListener = emitter.off || emitter.removeListener || emitter.removeEventListener;
  7. if (!addListener || !removeListener) {
  8. throw new TypeError('Emitter is not compatible');
  9. }
  10. return {
  11. addListener: addListener.bind(emitter),
  12. removeListener: removeListener.bind(emitter)
  13. };
  14. };
  15. const normalizeEvents = event => Array.isArray(event) ? event : [event];
  16. const multiple = (emitter, event, options) => {
  17. let cancel;
  18. const ret = new Promise((resolve, reject) => {
  19. options = Object.assign({
  20. rejectionEvents: ['error'],
  21. multiArgs: false,
  22. resolveImmediately: false
  23. }, options);
  24. if (!(options.count >= 0 && (options.count === Infinity || Number.isInteger(options.count)))) {
  25. throw new TypeError('The `count` option should be at least 0 or more');
  26. }
  27. // Allow multiple events
  28. const events = normalizeEvents(event);
  29. const items = [];
  30. const {addListener, removeListener} = normalizeEmitter(emitter);
  31. const onItem = (...args) => {
  32. const value = options.multiArgs ? args : args[0];
  33. if (options.filter && !options.filter(value)) {
  34. return;
  35. }
  36. items.push(value);
  37. if (options.count === items.length) {
  38. cancel();
  39. resolve(items);
  40. }
  41. };
  42. const rejectHandler = error => {
  43. cancel();
  44. reject(error);
  45. };
  46. cancel = () => {
  47. for (const event of events) {
  48. removeListener(event, onItem);
  49. }
  50. for (const rejectionEvent of options.rejectionEvents) {
  51. removeListener(rejectionEvent, rejectHandler);
  52. }
  53. };
  54. for (const event of events) {
  55. addListener(event, onItem);
  56. }
  57. for (const rejectionEvent of options.rejectionEvents) {
  58. addListener(rejectionEvent, rejectHandler);
  59. }
  60. if (options.resolveImmediately) {
  61. resolve(items);
  62. }
  63. });
  64. ret.cancel = cancel;
  65. if (typeof options.timeout === 'number') {
  66. const timeout = pTimeout(ret, options.timeout);
  67. timeout.cancel = cancel;
  68. return timeout;
  69. }
  70. return ret;
  71. };
  72. module.exports = (emitter, event, options) => {
  73. if (typeof options === 'function') {
  74. options = {filter: options};
  75. }
  76. options = Object.assign({}, options, {
  77. count: 1,
  78. resolveImmediately: false
  79. });
  80. const arrayPromise = multiple(emitter, event, options);
  81. const promise = arrayPromise.then(array => array[0]);
  82. promise.cancel = arrayPromise.cancel;
  83. return promise;
  84. };
  85. module.exports.multiple = multiple;
  86. module.exports.iterator = (emitter, event, options) => {
  87. if (typeof options === 'function') {
  88. options = {filter: options};
  89. }
  90. // Allow multiple events
  91. const events = normalizeEvents(event);
  92. options = Object.assign({
  93. rejectionEvents: ['error'],
  94. resolutionEvents: [],
  95. limit: Infinity,
  96. multiArgs: false
  97. }, options);
  98. const {limit} = options;
  99. const isValidLimit = limit >= 0 && (limit === Infinity || Number.isInteger(limit));
  100. if (!isValidLimit) {
  101. throw new TypeError('The `limit` option should be a non-negative integer or Infinity');
  102. }
  103. if (limit === 0) {
  104. // Return an empty async iterator to avoid any further cost
  105. return {
  106. [Symbol.asyncIterator]() {
  107. return this;
  108. },
  109. next() {
  110. return Promise.resolve({done: true, value: undefined});
  111. }
  112. };
  113. }
  114. let isLimitReached = false;
  115. const {addListener, removeListener} = normalizeEmitter(emitter);
  116. let done = false;
  117. let error;
  118. let hasPendingError = false;
  119. const nextQueue = [];
  120. const valueQueue = [];
  121. let eventCount = 0;
  122. const valueHandler = (...args) => {
  123. eventCount++;
  124. isLimitReached = eventCount === limit;
  125. const value = options.multiArgs ? args : args[0];
  126. if (nextQueue.length > 0) {
  127. const {resolve} = nextQueue.shift();
  128. resolve({done: false, value});
  129. if (isLimitReached) {
  130. cancel();
  131. }
  132. return;
  133. }
  134. valueQueue.push(value);
  135. if (isLimitReached) {
  136. cancel();
  137. }
  138. };
  139. const cancel = () => {
  140. done = true;
  141. for (const event of events) {
  142. removeListener(event, valueHandler);
  143. }
  144. for (const rejectionEvent of options.rejectionEvents) {
  145. removeListener(rejectionEvent, rejectHandler);
  146. }
  147. for (const resolutionEvent of options.resolutionEvents) {
  148. removeListener(resolutionEvent, resolveHandler);
  149. }
  150. while (nextQueue.length > 0) {
  151. const {resolve} = nextQueue.shift();
  152. resolve({done: true, value: undefined});
  153. }
  154. };
  155. const rejectHandler = (...args) => {
  156. error = options.multiArgs ? args : args[0];
  157. if (nextQueue.length > 0) {
  158. const {reject} = nextQueue.shift();
  159. reject(error);
  160. } else {
  161. hasPendingError = true;
  162. }
  163. cancel();
  164. };
  165. const resolveHandler = (...args) => {
  166. const value = options.multiArgs ? args : args[0];
  167. if (options.filter && !options.filter(value)) {
  168. return;
  169. }
  170. if (nextQueue.length > 0) {
  171. const {resolve} = nextQueue.shift();
  172. resolve({done: true, value});
  173. } else {
  174. valueQueue.push(value);
  175. }
  176. cancel();
  177. };
  178. for (const event of events) {
  179. addListener(event, valueHandler);
  180. }
  181. for (const rejectionEvent of options.rejectionEvents) {
  182. addListener(rejectionEvent, rejectHandler);
  183. }
  184. for (const resolutionEvent of options.resolutionEvents) {
  185. addListener(resolutionEvent, resolveHandler);
  186. }
  187. return {
  188. [symbolAsyncIterator]() {
  189. return this;
  190. },
  191. next() {
  192. if (valueQueue.length > 0) {
  193. const value = valueQueue.shift();
  194. return Promise.resolve({done: done && valueQueue.length === 0 && !isLimitReached, value});
  195. }
  196. if (hasPendingError) {
  197. hasPendingError = false;
  198. return Promise.reject(error);
  199. }
  200. if (done) {
  201. return Promise.resolve({done: true, value: undefined});
  202. }
  203. return new Promise((resolve, reject) => nextQueue.push({resolve, reject}));
  204. },
  205. return(value) {
  206. cancel();
  207. return Promise.resolve({done, value});
  208. }
  209. };
  210. };