Source: collections/Queue.js

  1. const { Collection, CollectionEvent } = require('./Collection')
  2. , { EqualityComparer } = require('./EqualityComparer')
  3. , { Resolve } = require('../tools/Resolve')
  4. , { defer } = require('../tools/Defer')
  5. , { Observable, fromEvent} = require('rxjs')
  6. , symbolQueueEnqueue = Symbol('queueEnqueue')
  7. , symbolQueueDequeue = Symbol('queueDequeue')
  8. , symbolQueueTakeOut = Symbol('queueTakeOut');
  9. /**
  10. * @template T
  11. * @author Sebastian Hönel <development@hoenel.net>
  12. */
  13. class Queue extends Collection {
  14. /**
  15. * Creates a new, empty Queue<T>.
  16. *
  17. * @param {EqualityComparer<T>} [eqComparer] Optional. Defaults To EqualityComparer<T>.default.
  18. */
  19. constructor(eqComparer = EqualityComparer.default) {
  20. super(eqComparer);
  21. /** @type {Observable<T>} */
  22. this.observableEnqueue = Object.freeze(fromEvent(this, symbolQueueEnqueue));
  23. /** @type {Observable<T>} */
  24. this.observableDequeue = Object.freeze(fromEvent(this, symbolQueueDequeue));
  25. };
  26. /**
  27. * @param {T} item The item to add at the end of the Queue.
  28. * @returns {this}
  29. */
  30. enqueue(item) {
  31. this._items.push(item);
  32. this.emit(symbolQueueEnqueue, new CollectionEvent(item));
  33. return this;
  34. };
  35. /**
  36. * @returns {T} The first item (at the beginning) of the Queue.
  37. */
  38. dequeue() {
  39. if (this.isEmpty) {
  40. throw new Error('The Queue is empty.');
  41. }
  42. const item = this._items.shift();
  43. this.emit(symbolQueueDequeue, new CollectionEvent(item));
  44. return item;
  45. };
  46. /**
  47. * @param {Number} index The index of the item to peek.
  48. * @returns {T} The item at the index.
  49. * @throws {Error} If the given index is out of range
  50. */
  51. peekIndex(index) {
  52. if (index < 0 || index > (this.size - 1)) {
  53. throw new Error(`The given index is out of range`);
  54. }
  55. return this._items[index];
  56. };
  57. /**
  58. * @param {Number} index The index of the element to remove. The index
  59. * must be in the range [0, this.size - 1]. The first element to take out
  60. * has index 0 (the last element inserted has the largest index, size - 1).
  61. * @returns {T} The dequeued item
  62. * @throws {Error} If the given index is out of range
  63. */
  64. takeOutIndex(index) {
  65. if (index === 0) {
  66. return this.dequeue();
  67. }
  68. if (index < 0 || index > (this.size - 1)) {
  69. throw new Error(`The given index is out of range`);
  70. }
  71. const item = this._items.splice(index, 1)[0];
  72. this.emit(symbolQueueTakeOut, new CollectionEvent(item));
  73. return item;
  74. };
  75. /**
  76. * @param {T} item The item to take ot, must be an item currently on this queue
  77. * @param {EqualityComparer<T>} [eqComparer] Optional. Defaults to this queue's
  78. * equality comparer. Used to find the index of the given item.
  79. * @returns {T} The dequeued item.
  80. * @throws {Error} If the item cannot be found in the queue.
  81. */
  82. takeOutItem(item, eqComparer = null) {
  83. /** @type {EqualityComparer<T>} */
  84. eqComparer = eqComparer instanceof EqualityComparer ? eqComparer : this.equalityComparer;
  85. const idx = this._items.findIndex((val, idx) => {
  86. return eqComparer.equals(val, item);
  87. });
  88. return this.takeOutIndex(idx);
  89. };
  90. /**
  91. * @returns {T} The first item without removing it.
  92. */
  93. peek() {
  94. if (this.isEmpty) {
  95. throw new Error('The Queue is empty.');
  96. }
  97. return this._items[0];
  98. };
  99. /**
  100. * @returns {T} The last item without removing it.
  101. */
  102. peekLast() {
  103. if (this.isEmpty) {
  104. throw new Error('The Queue is empty.');
  105. }
  106. return this._items[this.size - 1];
  107. };
  108. };
  109. /**
  110. * @template T
  111. * @author Sebastian Hönel <development@hoenel.net>
  112. */
  113. class ConstrainedQueue extends Queue {
  114. /**
  115. * Creates a new, empty ConstrainedQueue<T>.
  116. *
  117. * @param {Number} [maxSize] Optional. Defaults to Number.MAX_SAFE_INTEGER. Use this parameter to
  118. * limit the maximum amount of elements this Queue can hold. When the limit is reached and items
  119. * are being further enqueued, the ConstrainedQueue will dequeue and discard items to make space.
  120. * This parameter must be a positive integer larger than zero.
  121. * @param {EqualityComparer<T>} [eqComparer] Optional. Defaults To EqualityComparer<T>.default.
  122. */
  123. constructor(maxSize = Number.MAX_SAFE_INTEGER, eqComparer = EqualityComparer.default) {
  124. super(eqComparer);
  125. this._maxSize = 1;
  126. this.maxSize = maxSize;
  127. };
  128. /**
  129. * @type {Number}
  130. */
  131. get maxSize() {
  132. return this._maxSize;
  133. };
  134. /**
  135. * Sets the maximum size of this ConstrainedQueue. If currently there are more items, the queue
  136. * will be truncated (i.e. the excess items will be discarded). The excess items will be taken
  137. * from the head of the queue (dequeued).
  138. *
  139. * @param {Number} value The new value for maxSize. Must be an integer equal to or larger than 1.
  140. * @throws {Error} If parameter value is not a number or less than one (1).
  141. * @type {void}
  142. */
  143. set maxSize(value) {
  144. if (!Resolve.isTypeOf(value, Number) || !Number.isInteger(value)) {
  145. throw new Error(`The value given for maxSize is not a number.`);
  146. }
  147. if (value < 1) {
  148. throw new Error(`The value given is less than 1: ${value}`);
  149. }
  150. this._maxSize = value;
  151. this._truncate();
  152. };
  153. /**
  154. * @protected
  155. * @returns {this}
  156. */
  157. _truncate() {
  158. let excess = this.size - this.maxSize;
  159. while (excess > 0) {
  160. // Triggers/emits symbol for dequeueing items.
  161. this.dequeue();
  162. excess--;
  163. }
  164. return this;
  165. };
  166. /**
  167. * @override
  168. * @inheritdoc
  169. * @param {T} item
  170. * @returns {this}
  171. */
  172. enqueue(item) {
  173. super.enqueue(item);
  174. return this._truncate();
  175. };
  176. };
  177. /**
  178. * @template T, TQueue
  179. * @author Sebastian Hönel <development@hoenel.net>
  180. */
  181. class ItemAndDeferred {
  182. /**
  183. * @param {T} item
  184. * @param {Deferred<TQueue>} deferred
  185. */
  186. constructor(item, deferred) {
  187. this.item = item;
  188. this.deferred = deferred;
  189. };
  190. };
  191. /**
  192. * @template T
  193. * @author Sebastian Hönel <development@hoenel.net>
  194. */
  195. class ProducerConsumerQueue extends ConstrainedQueue {
  196. /**
  197. * Creates a new, empty ProducerConsumerQueue<T>.
  198. *
  199. * @param {Number} [maxSize] Optional. Defaults to Number.MAX_SAFE_INTEGER. Use this parameter to
  200. * limit the maximum amount of elements this Queue can hold. When the limit is reached and items
  201. * are being further enqueued, the ProducerConsumerQueue will defer further calls and only resolve
  202. * them, once space is available and the item has been enqueued. Likewise, an item is dequeued
  203. * immediately if the queue is non-empty. Otherwise, the call is deferred and resolved once an item
  204. * becomes available. Adjusting the capacity of this queue will only affect items in it, but not
  205. * deferred calls to enqueue().
  206. * @param {EqualityComparer<T>} [eqComparer] Optional. Defaults To EqualityComparer<T>.default.
  207. */
  208. constructor(maxSize = Number.MAX_SAFE_INTEGER, eqComparer = EqualityComparer.default) {
  209. super(maxSize, eqComparer);
  210. /**
  211. * @type {Queue<ItemAndDeferred<T, this>>}
  212. * @protected
  213. */
  214. this._deferredEnqueues = new Queue(eqComparer);
  215. /**
  216. * @type {Queue<Deferred<T>>}
  217. * @protected
  218. */
  219. this._deferredDequeues = new Queue(eqComparer);
  220. };
  221. /**
  222. * @protected
  223. */
  224. _itemAvailable() {
  225. while (this.size > 0 && this._deferredDequeues.size > 0) {
  226. const deferred = this._deferredDequeues.dequeue();
  227. deferred.resolve(super.dequeue());
  228. }
  229. };
  230. /**
  231. * @protected
  232. */
  233. _spaceAvailable() {
  234. while (this.size < this.maxSize && this._deferredEnqueues.size > 0) {
  235. const itemAndDef = this._deferredEnqueues.dequeue();
  236. super.enqueue(itemAndDef.item);
  237. itemAndDef.deferred.resolve(this);
  238. }
  239. };
  240. /**
  241. * @override
  242. * @inheritdoc
  243. * @param {T} item The item to be enqueued.
  244. * @returns {Promise<this>} The promise is resolved once the item got enqueued.
  245. */
  246. enqueue(item) {
  247. if (this.size < this.maxSize) {
  248. // There is space in the queue, so we can just enqueue the item.
  249. super.enqueue(item);
  250. setTimeout(this._itemAvailable.bind(this), 0);
  251. return Promise.resolve(this);
  252. } else {
  253. // We have to defer the request and wait for space in the queue.
  254. /** @type {Deferred<this>} */
  255. const deferred = defer();
  256. this._deferredEnqueues.enqueue(new ItemAndDeferred(item, deferred));
  257. return deferred.promise;
  258. }
  259. };
  260. /**
  261. * @override
  262. * @inheritdoc
  263. * @returns {Promise<T>} The promise is resolved once an item is available.
  264. */
  265. dequeue() {
  266. if (this.isEmpty) {
  267. // We have to wait for an item to be produced.
  268. /** @type {Deferred<T>} */
  269. const deferred = defer();
  270. this._deferredDequeues.enqueue(deferred);
  271. return deferred.promise;
  272. } else {
  273. // Dequeue the first available item.
  274. setTimeout(this._spaceAvailable.bind(this), 0);
  275. return Promise.resolve(super.dequeue());
  276. }
  277. };
  278. };
  279. module.exports = Object.freeze({
  280. Queue,
  281. ConstrainedQueue,
  282. ProducerConsumerQueue,
  283. symbolQueueEnqueue,
  284. symbolQueueDequeue,
  285. symbolQueueTakeOut
  286. });