const { defer } = require('./tools/Defer')
, { wrapError } = require('./tools/Error')
, EventEmitter = require('events').EventEmitter
, { Observable, Subscription, fromEvent } = require('rxjs')
, { Progress } = require('./Progress')
, { ProcessWrapper } = require('./ProcessWrapper')
, { Queue, ConstrainedQueue } = require('./collections/Queue')
, { EqualityComparer } = require('./collections/EqualityComparer')
, { DictionaryMapBased } = require('./collections/Dictionary')
, symbolRun = Symbol('run')
, symbolDone = Symbol('done')
, symbolFailed = Symbol('failed')
, symbolIdle = Symbol('idle');
class JobEvent {
/**
* @param {Job} job
* @param {Error|any} error
*/
constructor(job, error = void 0) {
this.job = job;
/** @type {Error|null} */
this.error = error ? wrapError(error) : null;
};
};
/**
* The events emitted are (using the declared/exported symbols and the
* class JobEvent):
* - 'run' (immediately triggered before the job is run)
* - 'done' (if successfully ran to completion, includes the result)
* - 'failed' (if failed with an error, includes the error)
*
* @template T
* @author Sebastian Hönel <development@hoenel.net>
*/
class Job extends EventEmitter {
/**
* @param {producerHandler.<Promise.<T>>} promiseProducer A function returning a Promise
* or an async function (essentially the same). If you want to use a sync
* function, call Job::fromSyncFunction(fn) instead.
* @param {String} [name] Optional. Defaults to undefined. Give an optional
* name to this job.
* @see {fromSyncFunction()}
*/
constructor(promiseProducer, name = void 0) {
super();
if (!(promiseProducer instanceof Function)) {
throw new Error(`The given promiseProducer is not a function.`);
}
/** @protected */
this._promiseProducer = promiseProducer;
/** @protected */
this._name = name;
/** @protected */
this._wasStarted = false;
/** @protected */
this._isRunning = false;
/** @protected */
this._isDone = false;
/** @protected */
this._hasFailed = false;
/**
* @type {T}
* @protected
*/
this._result = void 0;
/** @protected */
this._deferred = defer();
/**
* @type {Progress}
* @protected
*/
this._progressInstance = void 0;
/**
* @type {number}
* @protected
*/
this._cost = void 0;
/** @type {Observable.<JobEvent>} */
this.observableRun = Object.freeze(fromEvent(this, symbolRun));
/** @type {Observable.<JobEvent>} */
this.observableDone = Object.freeze(fromEvent(this, symbolDone));
/** @type {Observable.<JobEvent>} */
this.observableFailed = Object.freeze(fromEvent(this, symbolFailed));
/** @protected */
this._createTime = new Date();
/**
* @type {Date}
* @protected
*/
this._startTime = null;
/**
* @type {Date}
* @protected
*/
this._stopTime = null;
/**
* @type {DictionaryMapBased.<any, any>}
* @protected
*/
this._properties = new DictionaryMapBased();
};
/**
* @returns {DictionaryMapBased.<any, any>} A map with arbitrary properties or date
* associated with this Job.
*/
get properties() {
return this._properties;
};
/**
* @returns {Boolean} True, iff this Job was started. This is also true for done or
* failed Jobs or those that are currently running.
*/
get wasStarted() {
return this._wasStarted;
};
/**
* @returns {Boolean} True, iff this Job was previously started and is now either
* done or has failed.
*/
get isStopped() {
return this._isDone || this._hasFailed;
};
/**
* @returns {Date} The timestamp of when this Job was created.
*/
get createTime() {
return new Date(+this._createTime);
};
/**
* @returns {Date} The timestamp of when this Job was started.
* @throws {Error} If the Job was not yet started.
*/
get startTime() {
if (!this.wasStarted ) {
throw new Error('This Job was not yet started.');
}
return new Date(+this._startTime);
};
/**
* @returns {Date} The timestamp of when this Job was stopped (when it finished
* with success or failed).
* @throws {Error} If the Job was not yet stopped.
*/
get stopTime() {
if (!this.isStopped) {
throw new Error('This Job has not yet stopped.');
}
return new Date(+this._stopTime);
};
/**
* @returns {Number} Amount of milliseconds between start- and stop-time of this
* Job. If the Job was not yet started, returns 0. If it is still running, returns
* the time between start and now, otherwise between start and stop.
*/
get runDurationMs() {
if (!this.wasStarted) {
return 0;
}
const stopDate = this.isStopped ? this.stopTime : new Date();
return (stopDate) - (+this.startTime);
};
/**
* @returns {undefined|String} The name of this Job.
*/
get name() {
return this._name;
};
/**
* @param {undefined|String} value The name of this Job.
* @throws {Error} If the value is neither undefined nor a string.
*/
set name(value) {
if (value !== void 0 && typeof value !== 'string') {
throw new Error(`The given value for name must be undefined or a string.`);
}
this._name = value;
};
/**
* Creates a job from a synchronous function by wrapping it into an
* async function.
*
* @param {Function} func A function that does synchronous work and
* returns a value (i.e. its result is not awaited).
* @returns {Job}
*/
static fromSyncFunction(func) {
if (!(func instanceof Function)) {
throw new Error(`The parameter func must be of type Function.`);
}
return new Job(async() => {
return func();
});
};
/**
* Use this setter to give this Job an instance of Progress that may be used
* by the scheduler for this Job. This Job itself does not observe the instance
* or subscribe to its observables.
*
* @param {Progress} progress
*/
set progress(progress) {
if (!(progress instanceof Progress)) {
throw new Error('The progress must be an instance of Progress.');
}
this._progressInstance = progress;
};
/**
* @template TProgress
* @returns {Progress.<TProgress>} The Progress-instance of type TProgress.
*/
get progress() {
return this._progressInstance;
};
/**
* @returns {boolean} Whether or not this Job is/was equipped with an instance of Progress.
*/
get supportsProgress() {
return this._progressInstance instanceof Progress;
};
get hasCost() {
return !isNaN(this._cost) && this._cost > 0;
};
/**
* @param {number|undefined} [value] the new value to use as cost. Set to undefined to
* disable the usage of cost.
*/
set cost(value = void 0) {
if (Object.prototype.toString.call(value) === '[object Number]' && !isNaN(value) && value > 0) {
this._cost = value;
} else if (value === void 0) {
this._cost = void 0;
} else {
throw new Error(`The cost given was '${value}' but only undefined or positive numbers are supported.`);
}
};
/**
* @returns {number|undefined}
*/
get cost() {
return this._cost;
};
/**
* @returns {Boolean}
*/
get isDone() {
return this._isDone;
};
/**
* @returns {Boolean}
*/
get hasFailed() {
return this._hasFailed;
};
/**
* @returns {Boolean}
*/
get isRunning() {
return this._isRunning;
};
/**
* A promise that can be used to await this Job's final state.
* If resolved, will resolve with this Job's result. The promise
* returned by run() should only be used by the queue.
*
* @template T
* @returns {Promise.<T>}
*/
get donePromise() {
return this._deferred.promise;
};
/**
* @template T
* @returns {T}
*/
get result() {
if (!this.isDone) {
throw new Error(`This job is not yet done or has failed.`);
}
return this._result;
};
/**
* @template T
* @returns {Promise.<T>} the Promise that will resolve with the
* job's result.
*/
async run() {
/* Someone has to catch this Promise and it's not guaranteed that some 3rd party
* is observing this. If so, then this empty handler will not interfere with it
* but make sure that we do not get unhandled promise-rejection warnings. */
this.donePromise.catch(_ => {});
this._startTime = new Date();
this._wasStarted = true;
this._isRunning = true;
// Only emit this when we changed this Job's state to running.
this.emit(symbolRun, new JobEvent(this));
try {
const promise = this._promiseProducer();
if (!(promise instanceof Promise)) {
throw new Error(`The promiseProducer does not produce a Promise or is not an async function.`);
}
this._result = await promise;
this._isDone = true;
this._hasFailed = false;
this._deferred.resolve(this.result);
this.emit(symbolDone, new JobEvent(this));
return this.result;
} catch (e) {
this._isDone = false;
this._hasFailed = true;
this._deferred.reject(e);
this.emit(symbolFailed, new JobEvent(this, e));
throw e;
} finally {
this._isRunning = false;
this._stopTime = new Date();
}
};
/**
* Convenience function to create a Job from a ProcessWrapper. Once run, the
* Job will run the ProcessWrapper as Observable. All observed output is then
* reported as Progress, as the Job will be decorated with a Progress-instance
* of type ProcessOutput.
*
* @param {ProcessWrapper} processWrapper
* @returns {Job}
*/
static fromProcess(processWrapper) {
const progress = new Progress();
const job = new Job(() => new Promise((resolve, reject) => {
const obs = processWrapper.runObservable();
const subs = obs.subscribe(
procOut => progress.reportProgress(procOut),
procErr => {
subs.unsubscribe();
reject(procErr);
},
() => resolve(processWrapper.result)
);
}));
job.progress = progress;
return job;
};
};
/**
* This class is used as event that is emitted by the JobQueue.
*/
class JobQueueEvent {
/**
*
* @param {JobQueue} queue
* @param {Job} job
* @param {Error|any} [error] Optional. Defaults to undefined.
*/
constructor(queue, job, error = void 0) {
this.queue = queue;
this.job = job;
/** @type {Error|null} */
this.error = error ? wrapError(error) : null;
};
};
const JobQueueCapacityPolicy = Object.freeze({
/**
* Ignore the policy, and keep on enqueueing items, regardless of
* whether the maximum capacity is reached or not.
*/
Ignore: 0,
/**
* When the maximum capacity is reached, new items added to the
* queue will be silently discarded.
*/
Discard: 1,
/**
* When enqueueing new items and the maximum capacity is already
* reached, an Error is thrown.
*/
ThrowError: 2,
});
/**
* A standard queue for jobs that can handle parallel jobs up to a
* specified degree of parallelism. This queue will run jobs whenever
* there is space on it (i.e. it cannot be paused/resumed). Emits the
* same events as Job using the class JobQueueEvent.
*/
class JobQueue extends EventEmitter {
/**
* @param {Number} [numParallel] Optional. Defaults to 1.
* @param {Number} [capacity] Optional. The maximum capacity of this
* JobQueue. Only has an effect if the chosen policy is not set to
* ignore excess items. The maximum capacity is the total amount of
* Jobs either currently running or in the backlog this JobQueue can
* accomodate.
* @param {JobQueueCapacityPolicy|Number} Optional. The policy to
* use when the maximum capacity is reached, and new items are being
* enqueued.
*/
constructor(numParallel = 1, capacity = Number.MAX_SAFE_INTEGER, capacityPolicy = JobQueueCapacityPolicy.Ignore) {
super();
if (isNaN(numParallel) || !isFinite(numParallel) || numParallel < 1) {
throw new Error(`The value "${numParallel}" for the parameter numParallel is invalid. Only positive integers may be supplied.`);
}
/** @type {Queue.<Job>} */
this.queue = new Queue();
/** @type {ConstrainedQueue.<Job>} */
this.currentJobs = new ConstrainedQueue(numParallel);
this.numParallel = numParallel;
this.capacity = capacity;
this.capacityPolicy = capacityPolicy;
/** @type {Observable.<JobQueueEvent>} */
this.observableRun = Object.freeze(fromEvent(this, symbolRun));
/** @type {Observable.<JobQueueEvent>} */
this.observableDone = Object.freeze(fromEvent(this, symbolDone));
/** @type {Observable.<JobQueueEvent>} */
this.observableFailed = Object.freeze(fromEvent(this, symbolFailed));
/** @type {Observable.<JobQueueEvent>} */
this.observableIdle = Object.freeze(fromEvent(this, symbolIdle));
/** @protected */
this._isPaused = false;
/** @protected */
this._workDone = 0;
/** @protected */
this._workFailed = 0;
/** @protected */
this._numJobsDone = 0;
/** @protected */
this._numJobsFailed = 0;
};
/**
* @returns {number} A value indicating how much work has been done in terms
* of number of Jobs or their cost, depending on the queue's type.
*/
get workDone() {
return this._workDone;
};
/**
* @returns {number} A value indicating how much work has been failed in terms
* of number of Jobs or their cost, depending on the queue's type.
*/
get workFailed() {
return this._workFailed;
};
/**
* @returns {number} A value indicating how many Jobs have been done.
*/
get numJobsDone() {
return this._numJobsDone;
};
/**
* @returns {number} A value indicating how many Jobs have failed.
*/
get numJobsFailed() {
return this._numJobsFailed;
};
/**
* @returns {Boolean}
*/
get isPaused() {
return this._isPaused;
};
/**
* Pauses this queue, which results in no Jobs being further pushed on the
* internal processing queue. Note that already running jobs cannot be
* paused. If the queue is currently busy, you need to wait for the Idle-
* event by subscribing to the Observable.
*
* @see {symbolIdle}
* @see {observableIdle}
* @returns {this}
*/
pause() {
this._isPaused = true;
setTimeout(() => {
if (this.isPaused && this.isIdle) {
this.emit(symbolIdle, new JobQueueEvent(this));
}
}, 0);
return this;
};
/**
* Un-pauses this queue and immediately checks for work that can be done.
*
* @returns {this}
*/
resume() {
this._isPaused = false;
setTimeout(this._runNext.bind(this), 0);
return this;
};
/**
* Runs all jobs currently enqueued and then resolves when all are done. If
* the queue was paused, it will be resumed. Adding more jobs while this call
* is not resolved will defer it.
*
* @returns {Promise.<void>} That will resolve if the queue's current load
* is 0 (no jobs running and none in the backlog) or once it reaches 0 (if
* all currently enqueued jobs are done). The Promise is rejected if any of
* the jobs fails.
*/
runToCompletion() {
return new Promise((resolve, reject) => {
const checkDone = () => {
if (this.load === 0) {
this.emit(symbolIdle, new JobQueueEvent(this));
resolve();
return true;
}
return false;
};
if (checkDone()) {
return;
}
/** @type {Array.<Subscription>} */
const subscriptions = [];
const unsubscribeAll = () => {
subscriptions.forEach(subs => subs.unsubscribe());
};
subscriptions.push(this.observableDone.subscribe(jqEvt => {
if (checkDone()) {
unsubscribeAll();
}
}));
subscriptions.push(this.observableFailed.subscribe(jqEvt => {
unsubscribeAll();
reject(jqEvt.error);
}));
this.resume();
});
};
/**
* @returns {number} A number that represents the ratio between all current
* and enqueued jobs and this queue's parallelism. Numbers greater 1 indicate
* that there are jobs in the backlog. 0 Means the queue is idle and 1 means
* that it is using its full degree of parallelism and no jobs are waiting.
*/
get load() {
const numJobs = this.backlog + this.numJobsRunning;
return numJobs / this.numParallel;
};
/**
* @returns {number} A number indicating the queue's utilization in terms
* of a ratio of how many jobs are currently running and how many could be
* run. This number is therefore in the range [0, 1]. Unlike load, this
* property does not consider jobs in the backlog.
*/
get utilization() {
return this.numJobsRunning / this.numParallel;
};
/**
* @returns {number} The amount of jobs currently running.
*/
get numJobsRunning() {
return this.currentJobs.size;
};
/**
* @returns {number} The amount of jobs on this queue that are waiting
* to be processed.
*/
get backlog() {
return this.queue.size;
};
/**
* @returns {boolean} True if there are no jobs currently processing.
*/
get isIdle() {
return this.numJobsRunning === 0;
};
/**
* @returns {boolean} True if there are any current jobs running.
*/
get isWorking() {
return this.numJobsRunning > 0;
};
/**
* @returns {boolean} True iff the queue is busy and all of its slots
* are currently used actively by jobs (the number of active jobs is
* equal to maximum degree of parallelism).
*/
get isBusy() {
return this.numJobsRunning === this.numParallel;
};
/**
* Removes all jobs from the backlog and returns them.
*
* @template T
* @returns {Array.<Job.<T>>} An array with all not yet run jobs.
*/
clearBacklog() {
const backlogJobs = Array.from(this.queue.entries());
this.queue.clear();
return backlogJobs;
};
/**
* Determines if a specific Job is on the backlog of this JobQueue.
*
* @param {Job.<T>} job The job to check for
* @param {EqualityComparer.<Job.<T>>} [eqComparer] Optional. A comparer
* to use for checking for equality. Defaults to the DefaultEqualityComparer
* which uses the identity operator.
* @returns {Boolean} True, iff the Job is in the backlog.
*/
hasJob(job, eqComparer = EqualityComparer.default) {
return this.queue.has(job, eqComparer);
};
/**
* Determines if a specific Job is currently runnung on this JobQueue.
*
* @param {Job.<T>} job The Job to check for
* @param {EqualityComparer.<Job.<T>>} [eqComparer] Optional. A comparer
* to use for checking for equality. Defaults to the DefaultEqualityComparer
* which uses the identity operator.
* @returns {Boolean} True, iff this JobQueue is currently processing
* the given job.
*/
isJobRunning(job, eqComparer = EqualityComparer.default) {
return this.currentJobs.has(job, eqComparer);
};
/**
*
* @param {Job.<T>} job The Job to remove from the backlog.
* @param {EqualityComparer.<Job.<T>>} [eqComparer] Optional. A comparer
* to use for checking for equality. Defaults to the DefaultEqualityComparer
* which uses the identity operator.
* @returns {this}
*/
removeJobFromBacklog(job, eqComparer = EqualityComparer.default) {
this.queue.takeOutItem(job, eqComparer);
return this;
};
/**
* @template T
* @param {producerHandler.<T>} job A synchronous function to be used as a Job.
* @returns {this}
*/
addSyncJob(job) {
return this.addJob(Job.fromSyncFunction(job));
};
/**
* @template T
* @param {...(producerHandler.<T>)} jobs Many synchronous functions to add as Jobs.
* @returns {this}
*/
addSyncJobs(...jobs) {
jobs.forEach(job => this.addSyncJob(job));
return this;
};
/**
* Add a Job to this Queue. Can either be an instance of Job or an
* async function.
*
* @template T
* @param {Job.<T>|producerHandler.<Promise.<T>>} job
* @returns {this}
*/
addJob(job) {
if (!(job instanceof Job)) {
if (job instanceof Function) {
job = new Job(job);
} else {
throw new Error(`The given Job is not an instance of Job nor is it an instance of Function.`);
}
}
// Let's check capacity:
if (this.capacityPolicy !== JobQueueCapacityPolicy.Ignore && (this.backlog + this.numJobsRunning) === this.capacity) {
// This queue is currently operating at max capacity.
if (this.capacityPolicy === JobQueueCapacityPolicy.Discard) {
return this;
} else if (this.capacityPolicy === JobQueueCapacityPolicy.ThrowError) {
throw new Error(`The JobQueue is at maximum capacity (${this.capacity}) and new Jobs were attempted to add.`);
}
throw new Error(`The policy '${this.capacityPolicy}' is not known.`);
}
this.queue.enqueue(job);
setTimeout(this._runNext.bind(this), 0);
return this;
};
/**
* @template T
* @param {...(Job.<T>|producerHandler.<Promise.<T>>)} jobs
* @returns {this}
*/
addJobs(...jobs) {
jobs.forEach(job => this.addJob(job));
return this;
};
/**
* @protected
*/
_runNext() {
if (this.isPaused) {
if (this.isIdle) {
this.emit(symbolIdle, new JobQueueEvent(this));
}
return;
}
if (this.isBusy) {
return;
}
if (this.queue.isEmpty) {
return;
}
const nextJob = this.queue.dequeue();
const finalFunc = (() => {
this.currentJobs.takeOutItem(nextJob);
setTimeout(this._runNext.bind(this), 0);
}).bind(this);
this.currentJobs.enqueue(nextJob);
nextJob.run().then(val => {
this._numJobsDone++;
this._workDone += (nextJob.hasCost ? nextJob.cost : 1);
finalFunc();
this.emit(symbolDone, new JobQueueEvent(this, nextJob));
}).catch(err => {
this._numJobsFailed++;
this._workFailed += (nextJob.hasCost ? nextJob.cost : 1);
finalFunc();
this.emit(symbolFailed, new JobQueueEvent(this, nextJob, err));
});
// Emit now, after having called Job::run()
this.emit(symbolRun, new JobQueueEvent(this, nextJob));
};
};
module.exports = Object.freeze({
symbolRun,
symbolDone,
symbolFailed,
symbolIdle,
JobEvent,
Job,
JobQueueEvent,
JobQueue,
JobQueueCapacityPolicy
});