Source: ManualScheduler.js

const { Schedule, ScheduleEvent, PreliminaryScheduleEvent, symbolScheduleError, symbolScheduleComplete } = require('./Schedule')
, { Scheduler } = require('./Scheduler')
, { EventEmitter } = require('events')
, { Observable, Subscription, pipe, fromEvent} = require('rxjs')
, { map } = require('rxjs/operators')
, symbolManualSchedulerEvent = Symbol('manualSchedulerEvent');


/**
 * @author Sebastian Hönel <development@hoenel.net>
 */
class ManualScheduler extends Scheduler {
	constructor() {
		super(symbolManualSchedulerEvent);
		
		/** @type {Array<ManualSchedule>} */
		this.manualSchedules = [];
		/**
		 * @type {Object<string, ManualSchedule>}
		 * @protected
		 */
		this._manualScheduleIds = {};
		/** @protected */
		this._manualScheduleId = 0;

		/**
		 * @type {Object<string, Subscription>}
		 * @protected
		 */
		this._subscriptions = {};
	};

	/**
	 * @protected
	 * @param {ManualSchedule} schedule
	 * @throws {Error} if schedule is not of type ManualSchedule
	 * @returns {Boolean}
	 */
	_isManualSchedule(schedule) {
		if (!(schedule instanceof ManualSchedule)) {
			throw new Error('The given schedule is not an instance of ManualSchedule');
		}
		return true;
	};

	/**
	 * @protected
	 * @param {ManualSchedule} manualSchedule 
	 * @returns {String}
	 */
	_getManualScheduleId(manualSchedule) {
		this._isManualSchedule(manualSchedule);
		if (!this.hasManualSchedule(manualSchedule)) {
			throw new Error('This manualSchedule was not previously added.');
		}

		for (let key of Object.keys(this._manualScheduleIds)) {
			if (this._manualScheduleIds[key] === manualSchedule) {
				return key;
			}
		}

		throw new Error(`There is no ID for the manualSchedule.`);
	};

	/**
	 * Calls hasManualSchedule() with the given Schedule.
	 * 
	 * @param {ManualSchedule} schedule
	 * @returns {Boolean}
	 */
	hasSchedule(schedule) {
		return this.hasManualSchedule(schedule);
	};

	/**
	 * @param {ManualSchedule} manualSchedule
	 * @returns {Boolean}
	 */
	hasManualSchedule(manualSchedule) {
		return this._isManualSchedule(manualSchedule) &&
			this.manualSchedules.findIndex(i => i === manualSchedule) >= 0;
	};

	/**
	 * Calls addManualSchedule() with the given Schedule.
	 * 
	 * @param {Schedule|ManualSchedule} schedule
	 * @returns {this}
	 */
	addSchedule(schedule) {
		return this.addManualSchedule(schedule);
	};

	/**
	 * @param {ManualSchedule} manualSchedule
	 * @returns {this}
	 */
	addManualSchedule(manualSchedule) {
		if (this.hasManualSchedule(manualSchedule)) {
			throw new Error('This manualSchedule has been added already.');
		}

		this.manualSchedules.push(manualSchedule);

		const id = `i_${this._manualScheduleId++}`;
		this._manualScheduleIds[id] = manualSchedule;
		this._subscriptions[id] = manualSchedule.observable.subscribe(next => {
			this.emit(symbolManualSchedulerEvent, new ManualScheduleEventSimple(manualSchedule, next));
		},
		/* We are not re-throwing that error on our Observable, because than it
		 * would error and prevent other Schedule's events. To get a Schedule's
		 * errors, observe the Schedule manually. We'll have to register an empty
		 * handler so that the error is not thrown globally through rxjs.
		*/
		error => { },
		/* We are not observing on-complete as well, as there may be other Schedules. */
		() => { });

		return this;
	};

	/**
	 * Calls removeManualSchedule() with the given Schedule.
	 * 
	 * @param {ManualSchedule} schedule
	 * @returns {this}
	 */
	removeSchedule(schedule) {
		return this.removeManualSchedule(schedule);
	};

	/**
	 * Removes all ManualSchedules from this scheduler. This will lead to
	 * un-scheduling all of the schedules.
	 * 
	 * @override
	 * @inheritDoc
	 * @returns {Array<ManualSchedule>}
	 */
	removeAllSchedules() {
		const manualScheds = this.manualSchedules.slice(0);
		manualScheds.forEach(ms => this.removeSchedule(ms));
		return manualScheds;
	};

	/**
	 * @param {ManualSchedule} manualSchedule
	 * @returns {this}
	 */
	removeManualSchedule(manualSchedule) {
		if (!this.hasManualSchedule(manualSchedule)) {
			throw new Error('This manualSchedule was not previously added.');
		}

		const id = this._getManualScheduleId(manualSchedule);
		delete this._manualScheduleIds[id];
		this._subscriptions[id].unsubscribe();
		delete this._subscriptions[id];

		this.manualSchedules.splice(this.manualSchedules.findIndex(ms => ms === manualSchedule), 1);
		return this;
	};

	/**
	 * @type {Observable<ManualScheduleEventSimple>}
	 */
	get observable() {
		return super.observable;
	};

	/**
	 * Directly obtains and returns a ManualSchedule's Observable. However, its items
	 * are mapped to ManualScheduleEventSimple objects, to conform with the type of
	 * this method.
	 * 
	 * @template T Must be of type ScheduleEvent or more derived.
	 * @param {T|Schedule|ManualSchedule} schedule
	 * @returns {Observable<T|ScheduleEvent|ManualScheduleEventSimple>} An Observable
	 * for the designated schedule.
	 */
	getObservableForSchedule(schedule) {
		this._getManualScheduleId(schedule); // Will throw if schedule is not valid
		return schedule.observable.pipe(map(item => new ManualScheduleEventSimple(schedule, item)));
	};

	/**
	 * @override
	 * @inheritdoc
	 * @param {Date} [after] Optional. Defaults to undefined.
	 * @param {Date} [before] Optional. Defaults to undefined.
	 * @returns {IterableIterator<PreliminaryScheduleEvent<ManualSchedule, any>>}
	 */
	*preliminaryEvents(after = void 0, before = void 0) {
		for (const sched of this.manualSchedules) {
			for (const pre of sched.preliminaryEvents(...arguments)) {
				yield pre;
			}
		}
	};
};


/**
 * @template T
 * 
 * Sebastian Hönel <development@hoenel.net>
 */
class ManualSchedule extends Schedule {
	/**
	 * @param {Boolean} [enabled] Optional. Defaults to true.
	 */
	constructor(enabled = true) {
		super(!!enabled);

		/** @protected */
		this._emitter = new EventEmitter();
		/** @protected */
		this._observable = new Observable(subscriber => {
			fromEvent(this._emitter, symbolManualSchedulerEvent).subscribe(nextVal => {
				subscriber.next(nextVal);
			});

			fromEvent(this._emitter, symbolScheduleError).subscribe(nextErr => {
				subscriber.error(nextErr);
			});

			fromEvent(this._emitter, symbolScheduleComplete).subscribe(() => {
				subscriber.complete();
			});
		});
	};

	/**
	 * @type {Observable<T>}
	 */
	get observable() {
		return this._observable;
	};

	/**
	 * @protected
	 * @throws {Error} If this Schedule is not enabled.
	 */
	_requireIsEnabled() {
		if (!this.isEnabled) {
			throw new Error('This ManualSchedule is not enabled.');
		}
	};

	/**
	 * The main method to trigger an event on this schedule, that will be observed
	 * by the scheduler.
	 * 
	 * @deprecated Use triggerNext() instead. This method will be removed in v3.x!
	 * @param {T} item 
	 * @returns {this}
	 */
	trigger(item) {
		return this.triggerNext(item);
	};

	/**
	 * Calls trigger() and has been added for compatibility reasons to Observable.
	 * 
	 * @param {T} item
	 * @returns {this}
	 */
	triggerNext(item) {
		this._requireIsEnabled();
		this._emitter.emit(symbolManualSchedulerEvent, item);
		return this;
	};

	/**
	 * Will emit an error on this schedule's Observable. Note that the Observable
	 * will end then (i.e. no new items or complete will be emitted).
	 * 
	 * @param {any} error
	 * @returns {this}
	 */
	triggerError(error) {
		this._requireIsEnabled();
		this._emitter.emit(symbolScheduleError, error);
		return this;
	};

	/**
	 * Will trigger this schedule's Observable's complete-function. Note that the
	 * Observable will end then (i.e. no new items or complete will be emitted).
	 * 
	 * @returns {this}
	 */
	triggerComplete() {
		this._requireIsEnabled();
		this._emitter.emit(symbolScheduleComplete);
		return this;
	};
};


/**
 * @author Sebastian Hönel <development@hoenel.net>
 */
class ManualScheduleEventSimple extends ScheduleEvent {
	/**
	 * @template T
	 * @param {ManualSchedule} manualSchedule
	 * @param {T} item
	 */
	constructor(manualSchedule, item) {
		super(manualSchedule, item);
	};
};


module.exports = Object.freeze({
	ManualSchedule,
	ManualScheduler,
	ManualScheduleEventSimple,
	symbolManualSchedulerEvent
});