123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- 'use strict';
- const pTimeout = require('p-timeout');
- const symbolAsyncIterator = Symbol.asyncIterator || '@@asyncIterator';
- const normalizeEmitter = emitter => {
- const addListener = emitter.on || emitter.addListener || emitter.addEventListener;
- const removeListener = emitter.off || emitter.removeListener || emitter.removeEventListener;
- if (!addListener || !removeListener) {
- throw new TypeError('Emitter is not compatible');
- }
- return {
- addListener: addListener.bind(emitter),
- removeListener: removeListener.bind(emitter)
- };
- };
- const normalizeEvents = event => Array.isArray(event) ? event : [event];
- const multiple = (emitter, event, options) => {
- let cancel;
- const ret = new Promise((resolve, reject) => {
- options = Object.assign({
- rejectionEvents: ['error'],
- multiArgs: false,
- resolveImmediately: false
- }, options);
- if (!(options.count >= 0 && (options.count === Infinity || Number.isInteger(options.count)))) {
- throw new TypeError('The `count` option should be at least 0 or more');
- }
- // Allow multiple events
- const events = normalizeEvents(event);
- const items = [];
- const {addListener, removeListener} = normalizeEmitter(emitter);
- const onItem = (...args) => {
- const value = options.multiArgs ? args : args[0];
- if (options.filter && !options.filter(value)) {
- return;
- }
- items.push(value);
- if (options.count === items.length) {
- cancel();
- resolve(items);
- }
- };
- const rejectHandler = error => {
- cancel();
- reject(error);
- };
- cancel = () => {
- for (const event of events) {
- removeListener(event, onItem);
- }
- for (const rejectionEvent of options.rejectionEvents) {
- removeListener(rejectionEvent, rejectHandler);
- }
- };
- for (const event of events) {
- addListener(event, onItem);
- }
- for (const rejectionEvent of options.rejectionEvents) {
- addListener(rejectionEvent, rejectHandler);
- }
- if (options.resolveImmediately) {
- resolve(items);
- }
- });
- ret.cancel = cancel;
- if (typeof options.timeout === 'number') {
- const timeout = pTimeout(ret, options.timeout);
- timeout.cancel = cancel;
- return timeout;
- }
- return ret;
- };
- module.exports = (emitter, event, options) => {
- if (typeof options === 'function') {
- options = {filter: options};
- }
- options = Object.assign({}, options, {
- count: 1,
- resolveImmediately: false
- });
- const arrayPromise = multiple(emitter, event, options);
- const promise = arrayPromise.then(array => array[0]);
- promise.cancel = arrayPromise.cancel;
- return promise;
- };
- module.exports.multiple = multiple;
- module.exports.iterator = (emitter, event, options) => {
- if (typeof options === 'function') {
- options = {filter: options};
- }
- // Allow multiple events
- const events = normalizeEvents(event);
- options = Object.assign({
- rejectionEvents: ['error'],
- resolutionEvents: [],
- limit: Infinity,
- multiArgs: false
- }, options);
- const {limit} = options;
- const isValidLimit = limit >= 0 && (limit === Infinity || Number.isInteger(limit));
- if (!isValidLimit) {
- throw new TypeError('The `limit` option should be a non-negative integer or Infinity');
- }
- if (limit === 0) {
- // Return an empty async iterator to avoid any further cost
- return {
- [Symbol.asyncIterator]() {
- return this;
- },
- next() {
- return Promise.resolve({done: true, value: undefined});
- }
- };
- }
- let isLimitReached = false;
- const {addListener, removeListener} = normalizeEmitter(emitter);
- let done = false;
- let error;
- let hasPendingError = false;
- const nextQueue = [];
- const valueQueue = [];
- let eventCount = 0;
- const valueHandler = (...args) => {
- eventCount++;
- isLimitReached = eventCount === limit;
- const value = options.multiArgs ? args : args[0];
- if (nextQueue.length > 0) {
- const {resolve} = nextQueue.shift();
- resolve({done: false, value});
- if (isLimitReached) {
- cancel();
- }
- return;
- }
- valueQueue.push(value);
- if (isLimitReached) {
- cancel();
- }
- };
- const cancel = () => {
- done = true;
- for (const event of events) {
- removeListener(event, valueHandler);
- }
- for (const rejectionEvent of options.rejectionEvents) {
- removeListener(rejectionEvent, rejectHandler);
- }
- for (const resolutionEvent of options.resolutionEvents) {
- removeListener(resolutionEvent, resolveHandler);
- }
- while (nextQueue.length > 0) {
- const {resolve} = nextQueue.shift();
- resolve({done: true, value: undefined});
- }
- };
- const rejectHandler = (...args) => {
- error = options.multiArgs ? args : args[0];
- if (nextQueue.length > 0) {
- const {reject} = nextQueue.shift();
- reject(error);
- } else {
- hasPendingError = true;
- }
- cancel();
- };
- const resolveHandler = (...args) => {
- const value = options.multiArgs ? args : args[0];
- if (options.filter && !options.filter(value)) {
- return;
- }
- if (nextQueue.length > 0) {
- const {resolve} = nextQueue.shift();
- resolve({done: true, value});
- } else {
- valueQueue.push(value);
- }
- cancel();
- };
- for (const event of events) {
- addListener(event, valueHandler);
- }
- for (const rejectionEvent of options.rejectionEvents) {
- addListener(rejectionEvent, rejectHandler);
- }
- for (const resolutionEvent of options.resolutionEvents) {
- addListener(resolutionEvent, resolveHandler);
- }
- return {
- [symbolAsyncIterator]() {
- return this;
- },
- next() {
- if (valueQueue.length > 0) {
- const value = valueQueue.shift();
- return Promise.resolve({done: done && valueQueue.length === 0 && !isLimitReached, value});
- }
- if (hasPendingError) {
- hasPendingError = false;
- return Promise.reject(error);
- }
- if (done) {
- return Promise.resolve({done: true, value: undefined});
- }
- return new Promise((resolve, reject) => nextQueue.push({resolve, reject}));
- },
- return(value) {
- cancel();
- return Promise.resolve({done, value});
- }
- };
- };
|