Source: collections/Queue.js

  1. const { Collection, CollectionEvent } = require('./Collection')
  2. , { EqualityComparer } = require('./EqualityComparer')
  3. , { Resolve } = require('../tools/Resolve')
  4. , { defer } = require('../tools/Defer')
  5. , { Observable, fromEvent} = require('rxjs')
  6. , symbolQueueEnqueue = Symbol('queueEnqueue')
  7. , symbolQueueDequeue = Symbol('queueDequeue')
  8. , symbolQueueTakeOut = Symbol('queueTakeOut');
  9. /**
  10. * @template T
  11. * @extends {Collection<T>}
  12. * @author Sebastian Hönel <development@hoenel.net>
  13. */
  14. class Queue extends Collection {
  15. /**
  16. * Creates a new, empty Queue<T>.
  17. *
  18. * @param {EqualityComparer<T>} [eqComparer] Optional. Defaults To EqualityComparer<T>.default.
  19. */
  20. constructor(eqComparer = EqualityComparer.default) {
  21. super(eqComparer);
  22. /** @type {Observable<T>} */
  23. this.observableEnqueue = Object.freeze(fromEvent(this, symbolQueueEnqueue));
  24. /** @type {Observable<T>} */
  25. this.observableDequeue = Object.freeze(fromEvent(this, symbolQueueDequeue));
  26. };
  27. /**
  28. * @param {T} item The item to add at the end of the Queue.
  29. * @returns {this}
  30. */
  31. enqueue(item) {
  32. this._items.push(item);
  33. this.emit(symbolQueueEnqueue, new CollectionEvent(item));
  34. return this;
  35. };
  36. /**
  37. * @returns {T} The first item (at the beginning) of the Queue.
  38. */
  39. dequeue() {
  40. if (this.isEmpty) {
  41. throw new Error('The Queue is empty.');
  42. }
  43. const item = this._items.shift();
  44. this.emit(symbolQueueDequeue, new CollectionEvent(item));
  45. return item;
  46. };
  47. /**
  48. * @param {Number} index The index of the item to peek.
  49. * @returns {T} The item at the index.
  50. * @throws {Error} If the given index is out of range
  51. */
  52. peekIndex(index) {
  53. if (index < 0 || index > (this.size - 1)) {
  54. throw new Error(`The given index is out of range`);
  55. }
  56. return this._items[index];
  57. };
  58. /**
  59. * @param {Number} index The index of the element to remove. The index
  60. * must be in the range [0, this.size - 1]. The first element to take out
  61. * has index 0 (the last element inserted has the largest index, size - 1).
  62. * @returns {T} The dequeued item
  63. * @throws {Error} If the given index is out of range
  64. */
  65. takeOutIndex(index) {
  66. if (index === 0) {
  67. return this.dequeue();
  68. }
  69. if (index < 0 || index > (this.size - 1)) {
  70. throw new Error(`The given index is out of range`);
  71. }
  72. const item = this._items.splice(index, 1)[0];
  73. this.emit(symbolQueueTakeOut, new CollectionEvent(item));
  74. return item;
  75. };
  76. /**
  77. * @param {T} item The item to take ot, must be an item currently on this queue
  78. * @param {EqualityComparer<T>} [eqComparer] Optional. Defaults to this queue's
  79. * equality comparer. Used to find the index of the given item.
  80. * @returns {T} The dequeued item.
  81. * @throws {Error} If the item cannot be found in the queue.
  82. */
  83. takeOutItem(item, eqComparer = null) {
  84. /** @type {EqualityComparer<T>} */
  85. eqComparer = eqComparer instanceof EqualityComparer ? eqComparer : this.equalityComparer;
  86. const idx = this._items.findIndex((val, idx) => {
  87. return eqComparer.equals(val, item);
  88. });
  89. return this.takeOutIndex(idx);
  90. };
  91. /**
  92. * @returns {T} The first item without removing it.
  93. */
  94. peek() {
  95. if (this.isEmpty) {
  96. throw new Error('The Queue is empty.');
  97. }
  98. return this._items[0];
  99. };
  100. /**
  101. * @returns {T} The last item without removing it.
  102. */
  103. peekLast() {
  104. if (this.isEmpty) {
  105. throw new Error('The Queue is empty.');
  106. }
  107. return this._items[this.size - 1];
  108. };
  109. };
  110. /**
  111. * @readonly
  112. * @enum {Number}
  113. */
  114. const ConstrainedQueueCapacityPolicy = {
  115. /**
  116. * Dequeue items to make space and accomodate new items.
  117. */
  118. Dequeue: 1,
  119. /**
  120. * Ignore attempts to enqueue more items once the full capacity
  121. * is reached. New items are not enqueued and silently discarded.
  122. */
  123. IgnoreEnqueue: 2,
  124. /**
  125. * Reject attempts to enqueue more items once the full capacity
  126. * is reached. An error is thrown in this case.
  127. */
  128. RejectEnqueue: 4
  129. };
  130. /**
  131. * @template T
  132. * @extends {Queue<T>}
  133. * @author Sebastian Hönel <development@hoenel.net>
  134. */
  135. class ConstrainedQueue extends Queue {
  136. /**
  137. * Creates a new, empty ConstrainedQueue<T>.
  138. *
  139. * @param {Number} [maxSize] Optional. Defaults to Number.MAX_SAFE_INTEGER. Use this parameter to
  140. * limit the maximum amount of elements this Queue can hold. When the limit is reached and items
  141. * are being further enqueued, the ConstrainedQueue will, according to the maximum capacity policy,
  142. * deal with additional items. This parameter must be a positive integer larger than zero.
  143. * @param {EqualityComparer<T>} [eqComparer] Optional. Defaults To EqualityComparer<T>.default.
  144. * @param {ConstrainedQueueCapacityPolicy|Number} [capacityPolicy] Optional. Defaults to Dequeue.
  145. * How to deal with more items once this queue reaches its full capacity.
  146. */
  147. constructor(
  148. maxSize = Number.MAX_SAFE_INTEGER, eqComparer = EqualityComparer.default,
  149. capacityPolicy = ConstrainedQueueCapacityPolicy.Dequeue
  150. ) {
  151. super(eqComparer);
  152. /** @protected */
  153. this._maxSize = 1;
  154. this.maxSize = maxSize;
  155. /** @protected */
  156. this._capacityPolicy = capacityPolicy;
  157. this.capacityPolicy = capacityPolicy;
  158. };
  159. /**
  160. * Returns the current capacity policy.
  161. *
  162. * @type {ConstrainedQueueCapacityPolicy}
  163. */
  164. get capacityPolicy() {
  165. return this._capacityPolicy;
  166. };
  167. /**
  168. * @param {ConstrainedQueueCapacityPolicy|Number} value
  169. * @type {void}
  170. */
  171. set capacityPolicy(value) {
  172. switch (value) {
  173. case ConstrainedQueueCapacityPolicy.Dequeue:
  174. case ConstrainedQueueCapacityPolicy.IgnoreEnqueue:
  175. case ConstrainedQueueCapacityPolicy.RejectEnqueue:
  176. this._capacityPolicy = value;
  177. break;
  178. default:
  179. throw new Error(`The policy '${value}' is not supported.`);
  180. }
  181. };
  182. /**
  183. * @type {Number}
  184. */
  185. get maxSize() {
  186. return this._maxSize;
  187. };
  188. /**
  189. * Sets the maximum size of this ConstrainedQueue. If currently there are more items, the queue
  190. * will be truncated (i.e. the excess items will be discarded). The excess items will be taken
  191. * from the head of the queue (dequeued).
  192. *
  193. * @param {Number} value The new value for maxSize. Must be an integer equal to or larger than 1.
  194. * @throws {Error} If parameter value is not a number or less than one (1).
  195. * @type {void}
  196. */
  197. set maxSize(value) {
  198. if (!Resolve.isTypeOf(value, Number) || !Number.isInteger(value)) {
  199. throw new Error(`The value given for maxSize is not a number.`);
  200. }
  201. if (value < 1) {
  202. throw new Error(`The value given is less than 1: ${value}`);
  203. }
  204. this._maxSize = value;
  205. this._truncate();
  206. };
  207. /**
  208. * Returns whether this queue has reached its full maximum capacity.
  209. *
  210. * @type {Boolean}
  211. */
  212. get isFull() {
  213. return this.size === this.maxSize;
  214. };
  215. /**
  216. * @protected
  217. * @returns {this}
  218. */
  219. _truncate() {
  220. let excess = this.size - this.maxSize;
  221. while (excess > 0) {
  222. // Triggers/emits symbol for dequeueing items.
  223. this.dequeue();
  224. excess--;
  225. }
  226. return this;
  227. };
  228. /**
  229. * @override
  230. * @inheritdoc
  231. * @param {T} item
  232. * @returns {this}
  233. */
  234. enqueue(item) {
  235. if (this.size === this.maxSize) {
  236. switch (this.capacityPolicy) {
  237. case ConstrainedQueueCapacityPolicy.RejectEnqueue:
  238. throw new Error(`Cannot enqueue more items, Queue is full.`);
  239. case ConstrainedQueueCapacityPolicy.IgnoreEnqueue:
  240. return this;
  241. }
  242. }
  243. super.enqueue(item);
  244. return this._truncate();
  245. };
  246. };
  247. /**
  248. * Adds more policies for @see {ProducerConsumerQueue}, which is also of type
  249. * @see {ConstrainedQueue}.
  250. *
  251. * @see {ConstrainedQueueCapacityPolicy}
  252. * @readonly
  253. * @enum {Number}
  254. */
  255. const ProducerConsumerQueueCapacityPolicy = {
  256. /**
  257. * The default for @see {ProducerConsumerQueue}. When such a queue
  258. * reaches its maximum capacity, further enqueue calls will be
  259. * deferred and handled according to a policy.
  260. */
  261. DeferEnqueue: 8
  262. };
  263. /**
  264. * @template T, TQueue
  265. * @author Sebastian Hönel <development@hoenel.net>
  266. */
  267. class ItemAndDeferred {
  268. /**
  269. * @param {T} item
  270. * @param {Deferred<TQueue>} deferred
  271. */
  272. constructor(item, deferred) {
  273. this.item = item;
  274. this.deferred = deferred;
  275. };
  276. };
  277. /**
  278. * @template T
  279. * @extends {ConstrainedQueue<T>}
  280. * @author Sebastian Hönel <development@hoenel.net>
  281. */
  282. class ProducerConsumerQueue extends ConstrainedQueue {
  283. /**
  284. * Creates a new, empty ProducerConsumerQueue<T>.
  285. *
  286. * @param {Number} [maxSize] Optional. Defaults to Number.MAX_SAFE_INTEGER. Use this parameter to
  287. * limit the maximum amount of elements this Queue can hold. When the limit is reached and items
  288. * are being further enqueued, the ProducerConsumerQueue will defer further calls and only resolve
  289. * them, once space is available and the item has been enqueued. Likewise, an item is dequeued
  290. * immediately if the queue is non-empty. Otherwise, the call is deferred and resolved once an item
  291. * becomes available. Adjusting the capacity of this queue will only affect items in it, but not
  292. * deferred calls to enqueue().
  293. * @param {EqualityComparer<T>} [eqComparer] Optional. Defaults To EqualityComparer<T>.default.
  294. * @param {ConstrainedQueueCapacityPolicy|ProducerConsumerQueueCapacityPolicy|Number} [capacityPolicy]
  295. * A policy for how to deal with new items once this queue reaches its maximum capacity and further
  296. * items are enqueued. Additionally to @see {ConstrainedQueue}, this queue can also defer further
  297. * calls
  298. * @param {Number} [maxDeferredEnqueues] Optional. The maximum amount of deferred enqueue-calls.
  299. * The behavior can be controlled with @see {maxDeferredEnqueuesCapacityPolicy}.
  300. * @param {ConstrainedQueueCapacityPolicy|Number} [maxDeferredEnqueuesCapacityPolicy] Optional. How
  301. * to deal with enqueue calls when the queue with deferred calls is full.
  302. */
  303. constructor(
  304. maxSize = Number.MAX_SAFE_INTEGER, eqComparer = EqualityComparer.default,
  305. capacityPolicy = ProducerConsumerQueueCapacityPolicy.DeferEnqueue,
  306. maxDeferredEnqueues = Number.MAX_SAFE_INTEGER,
  307. maxDeferredEnqueuesCapacityPolicy = ConstrainedQueueCapacityPolicy.RejectEnqueue
  308. ) {
  309. super(maxSize, eqComparer, capacityPolicy);
  310. /**
  311. * @type {Queue<ItemAndDeferred<T, this>>}
  312. * @protected
  313. */
  314. this._deferredEnqueues = new Queue(eqComparer);
  315. /**
  316. * @type {Queue<Deferred<T>>}
  317. * @protected
  318. */
  319. this._deferredDequeues = new Queue(eqComparer);
  320. /** @protected */
  321. this._maxDeferredEnqueues = maxDeferredEnqueues;
  322. this.maxDeferredEnqueues = maxDeferredEnqueues;
  323. /** @protected */
  324. this._maxDeferredEnqueuesCapacityPolicy = maxDeferredEnqueuesCapacityPolicy;
  325. this.maxDeferredEnqueuesCapacityPolicy = maxDeferredEnqueuesCapacityPolicy;
  326. };
  327. /**
  328. * Returns the amount of deferred enqueue calls (waiting producers).
  329. *
  330. * @type {Number}
  331. */
  332. get numDeferredEnqueues() {
  333. return this._deferredEnqueues.size;
  334. };
  335. /**
  336. * Returns the currently deferred dequeue calls (waiting consumers).
  337. *
  338. * @type {Number}
  339. */
  340. get numDeferredDequeues() {
  341. return this._deferredDequeues.size;
  342. };
  343. /**
  344. * Returns the maximum amount of allowed deferred enqueue calls.
  345. *
  346. * @type {Number}
  347. */
  348. get maxDeferredEnqueues() {
  349. return this._maxDeferredEnqueues;
  350. };
  351. /**
  352. * Sets the maximum amount of deferred enqueue calls. If the new amount
  353. * is smaller than the currently waiting calls, then calls will be dequeued
  354. * in a FIFO manner and rejected.
  355. *
  356. * @see {_truncateDeferredEnqueues}
  357. * @param {Number} value the new maximum amount
  358. * @type {void}
  359. */
  360. set maxDeferredEnqueues(value) {
  361. this._maxDeferredEnqueues = value;
  362. this._truncateDeferredEnqueues();
  363. };
  364. /**
  365. * Returns the policy for dealing with enqueue calls once the waiting queue
  366. * is full.
  367. *
  368. * @type {ConstrainedQueueCapacityPolicy|Number}
  369. */
  370. get maxDeferredEnqueuesCapacityPolicy() {
  371. return this._maxDeferredEnqueuesCapacityPolicy;
  372. };
  373. /**
  374. * Set the policy for dealing with enqueue calls once the waiting queue
  375. * is full.
  376. *
  377. * @param {ConstrainedQueueCapacityPolicy|Number} value
  378. * @type {void}
  379. */
  380. set maxDeferredEnqueuesCapacityPolicy(value) {
  381. switch (value) {
  382. case ConstrainedQueueCapacityPolicy.Dequeue:
  383. case ConstrainedQueueCapacityPolicy.IgnoreEnqueue:
  384. case ConstrainedQueueCapacityPolicy.RejectEnqueue:
  385. this._maxDeferredEnqueuesCapacityPolicy = value;
  386. break;
  387. default:
  388. throw new Error(`The policy '${value}' is not supported.`);
  389. }
  390. };
  391. /**
  392. * Because we override the setter, we need to override the getter.
  393. *
  394. * @override
  395. * @type {ConstrainedQueueCapacityPolicy|ProducerConsumerQueueCapacityPolicy|Number}
  396. */
  397. get capacityPolicy() {
  398. return super.capacityPolicy;
  399. };
  400. /**
  401. * @override
  402. * @param {ConstrainedQueueCapacityPolicy|ProducerConsumerQueueCapacityPolicy|Number} value
  403. * @type {void}
  404. */
  405. set capacityPolicy(value) {
  406. if (value === ProducerConsumerQueueCapacityPolicy.DeferEnqueue) {
  407. this._capacityPolicy = value;
  408. return;
  409. }
  410. return super.capacityPolicy = value;
  411. };
  412. /**
  413. * @protected
  414. */
  415. _itemAvailable() {
  416. while (this.size > 0 && this._deferredDequeues.size > 0) {
  417. const deferred = this._deferredDequeues.dequeue();
  418. deferred.resolve(super.dequeue());
  419. }
  420. };
  421. /**
  422. * @protected
  423. */
  424. _spaceAvailable() {
  425. while (this.size < this.maxSize && this._deferredEnqueues.size > 0) {
  426. const itemAndDef = this._deferredEnqueues.dequeue();
  427. super.enqueue(itemAndDef.item);
  428. itemAndDef.deferred.resolve(this);
  429. }
  430. };
  431. /**
  432. * @protected
  433. */
  434. _truncateDeferredEnqueues() {
  435. while (!this._deferredEnqueues.isEmpty && this._deferredEnqueues.size > this.maxDeferredEnqueues) {
  436. const defAndItem = this._deferredEnqueues.dequeue();
  437. try {
  438. defAndItem.deferred.reject();
  439. } catch (e) { }
  440. }
  441. };
  442. /**
  443. * @override
  444. * @inheritdoc
  445. * @param {T} item The item to be enqueued.
  446. * @returns {Promise<this>} The promise is resolved once the item got enqueued.
  447. */
  448. enqueue(item) {
  449. if (this.size < this.maxSize) {
  450. // There is space in the queue, so we can just enqueue the item.
  451. super.enqueue(item);
  452. setTimeout(this._itemAvailable.bind(this), 0);
  453. return Promise.resolve(this);
  454. } else {
  455. // This queue is full; let's check if we can defer the request:
  456. switch (this.capacityPolicy) {
  457. case ConstrainedQueueCapacityPolicy.Dequeue:
  458. case ConstrainedQueueCapacityPolicy.IgnoreEnqueue:
  459. case ConstrainedQueueCapacityPolicy.RejectEnqueue:
  460. return Promise.resolve(super.enqueue(item));
  461. }
  462. // Up to the maximum capacity of the enqueue-queue, we can defer calls.
  463. if (this.numDeferredEnqueues === this.maxDeferredEnqueues) {
  464. switch (this.maxDeferredEnqueuesCapacityPolicy) {
  465. case ConstrainedQueueCapacityPolicy.IgnoreEnqueue:
  466. return Promise.resolve(this);
  467. case ConstrainedQueueCapacityPolicy.RejectEnqueue:
  468. return Promise.reject('The number of maximum deferred enqueues has been reached.');
  469. }
  470. }
  471. // We have to defer the request and wait for space in the queue.
  472. /** @type {Deferred<this>} */
  473. const deferred = defer();
  474. this._deferredEnqueues.enqueue(new ItemAndDeferred(item, deferred));
  475. this._truncateDeferredEnqueues();
  476. return deferred.promise;
  477. }
  478. };
  479. /**
  480. * @override
  481. * @inheritdoc
  482. * @returns {Promise<T>} The promise is resolved once an item is available.
  483. */
  484. dequeue() {
  485. if (this.isEmpty) {
  486. // We have to wait for an item to be produced.
  487. /** @type {Deferred<T>} */
  488. const deferred = defer();
  489. this._deferredDequeues.enqueue(deferred);
  490. return deferred.promise;
  491. } else {
  492. // Dequeue the first available item.
  493. setTimeout(this._spaceAvailable.bind(this), 0);
  494. return Promise.resolve(super.dequeue());
  495. }
  496. };
  497. };
  498. module.exports = Object.freeze({
  499. Queue,
  500. ConstrainedQueue,
  501. ConstrainedQueueCapacityPolicy,
  502. ProducerConsumerQueue,
  503. ProducerConsumerQueueCapacityPolicy,
  504. symbolQueueEnqueue,
  505. symbolQueueDequeue,
  506. symbolQueueTakeOut
  507. });