Source: ManualScheduler.js

const { Schedule, ScheduleEvent, PreliminaryScheduleEvent, symbolScheduleError, symbolScheduleComplete } = require('./Schedule')
, { Scheduler } = require('./Scheduler')
, { EventEmitter } = require('events')
, { Observable, Subscription, pipe, fromEvent} = require('rxjs')
, { map } = require('rxjs/operators')
, symbolManualSchedulerEvent = Symbol('manualSchedulerEvent');


/**
 * @author Sebastian Hönel <development@hoenel.net>
 */
class ManualScheduler extends Scheduler {
  constructor() {
    super(symbolManualSchedulerEvent);
    
    /** @type {Array.<ManualSchedule>} */
    this.manualSchedules = [];
    /** @type {Object.<string, ManualSchedule>} */
    this._manualScheduleIds = {};
    this._manualScheduleId = 0;

    /** @type {Object.<string, Subscription>} */
    this._subscriptions = {};
  };

  /**
   * @param {ManualSchedule} schedule
   * @throws {Error} if schedule is not of type ManualSchedule
   * @returns {boolean}
   */
  _isManualSchedule(schedule) {
    if (!(schedule instanceof ManualSchedule)) {
      throw new Error('The given schedule is not an instance of ManualSchedule');
    }
    return true;
  };

  /**
   * @param {ManualSchedule} manualSchedule 
   * @returns {string}
   */
  _getManualScheduleId(manualSchedule) {
    this._isManualSchedule(manualSchedule);
    if (!this.hasManualSchedule(manualSchedule)) {
      throw new Error('This manualSchedule was not previously added.');
    }

    for (let key of Object.keys(this._manualScheduleIds)) {
      if (this._manualScheduleIds[key] === manualSchedule) {
        return key;
      }
    }

    throw new Error(`There is no ID for the manualSchedule.`);
  };

  /**
   * Calls hasManualSchedule() with the given Schedule.
   * 
   * @param {ManualSchedule} schedule
   * @returns {boolean}
   */
  hasSchedule(schedule) {
    return this.hasManualSchedule(schedule);
  };

  /**
   * @param {ManualSchedule} manualSchedule
   * @returns {boolean}
   */
  hasManualSchedule(manualSchedule) {
    return this._isManualSchedule(manualSchedule) &&
      this.manualSchedules.findIndex(i => i === manualSchedule) >= 0;
  };

  /**
   * Calls addManualSchedule() with the given Schedule.
   * 
   * @param {Schedule|ManualSchedule} schedule
   * @returns {this}
   */
  addSchedule(schedule) {
    return this.addManualSchedule(schedule);
  };

  /**
   * @param {ManualSchedule} manualSchedule
   * @returns {this}
   */
  addManualSchedule(manualSchedule) {
    if (this.hasManualSchedule(manualSchedule)) {
      throw new Error('This manualSchedule has been added already.');
    }

    this.manualSchedules.push(manualSchedule);

    const id = `i_${this._manualScheduleId++}`;
    this._manualScheduleIds[id] = manualSchedule;
    this._subscriptions[id] = manualSchedule.observable.subscribe(next => {
      this.emit(symbolManualSchedulerEvent, new ManualScheduleEventSimple(manualSchedule, next));
    },
    /* We are not re-throwing that error on our Observable, because than it
     * would error and prevent other Schedule's events. To get a Schedule's
     * errors, observe the Schedule manually. We'll have to register an empty
     * handler so that the error is not thrown globally through rxjs.
    */
    error => { },
    /* We are not observing on-complete as well, as there may be other Schedules. */
    () => { });

    return this;
  };

  /**
   * Calls removeManualSchedule() with the given Schedule.
   * 
   * @param {ManualSchedule} schedule
   * @returns {this}
   */
  removeSchedule(schedule) {
    return this.removeManualSchedule(schedule);
  };

  /**
   * Removes all ManualSchedules from this scheduler. This will lead to
   * un-scheduling all of the schedules.
   * 
   * @inheritDoc
   * @returns {Array.<ManualSchedule>}
   */
  removeAllSchedules() {
    const manualScheds = this.manualSchedules.slice(0);
    manualScheds.forEach(ms => this.removeSchedule(ms));
    return manualScheds;
  };

  /**
   * @param {ManualSchedule} manualSchedule
   * @returns {this}
   */
  removeManualSchedule(manualSchedule) {
    if (!this.hasManualSchedule(manualSchedule)) {
      throw new Error('This manualSchedule was not previously added.');
    }

    const id = this._getManualScheduleId(manualSchedule);
    delete this._manualScheduleIds[id];
    this._subscriptions[id].unsubscribe();
    delete this._subscriptions[id];

    this.manualSchedules.splice(this.manualSchedules.findIndex(ms => ms === manualSchedule), 1);
    return this;
  };

  /**
   * @returns {Observable.<ManualScheduleEventSimple>}
   */
  get observable() {
    return super.observable;
  };

  /**
   * Directly obtains and returns a ManualSchedule's Observable. However, its items
   * are mapped to ManualScheduleEventSimple objects, to conform with the type of
   * this method.
   * 
   * @template T Must be of type ScheduleEvent or more derived.
   * @param {T|Schedule|ManualSchedule} schedule
   * @returns {Observable.<T|ScheduleEvent|ManualScheduleEventSimple>} An Observable
   * for the designated schedule.
   */
  getObservableForSchedule(schedule) {
    this._getManualScheduleId(schedule); // Will throw if schedule is not valid
    return schedule.observable.pipe(map(item => new ManualScheduleEventSimple(schedule, item)));
  };

  /**
   * @inheritdoc
   * @param {Date} [after] Optional. Defaults to undefined.
   * @param {Date} [before] Optional. Defaults to undefined.
   * @returns {IterableIterator.<PreliminaryScheduleEvent.<ManualSchedule, any>>}
   */
  *preliminaryEvents(after = void 0, before = void 0) {
    for (const sched of this.manualSchedules) {
      for (const pre of sched.preliminaryEvents(...arguments)) {
        yield pre;
      }
    }
  };
};


/**
 * @template T
 * 
 * Sebastian Hönel <development@hoenel.net>
 */
class ManualSchedule extends Schedule {
  /**
   * @param {boolean} [enabled] Optional. Defaults to true.
   */
  constructor(enabled = true) {
    super(!!enabled);

    this._emitter = new EventEmitter();
    this._observable = new Observable(subscriber => {
      fromEvent(this._emitter, symbolManualSchedulerEvent).subscribe(nextVal => {
        subscriber.next(nextVal);
      });

      fromEvent(this._emitter, symbolScheduleError).subscribe(nextErr => {
        subscriber.error(nextErr);
      });

      fromEvent(this._emitter, symbolScheduleComplete).subscribe(() => {
        subscriber.complete();
      });
    });
  };

  /**
   * @returns {Observable.<T>}
   */
  get observable() {
    return this._observable;
  };

  /**
   * @throws {Error} If this Schedule is not enabled.
   */
  _requireIsEnabled() {
    if (!this.isEnabled) {
      throw new Error('This ManualSchedule is not enabled.');
    }
  };

  /**
   * The main method to trigger an event on this schedule, that will be observed
   * by the scheduler.
   * 
   * @deprecated Use triggerNext() instead. This method will be removed in v3.x!
   * @param {T} item 
   * @returns {this}
   */
  trigger(item) {
    return this.triggerNext(item);
  };

  /**
   * Calls trigger() and has been added for compatibility reasons to Observable.
   * 
   * @param {T} item
   * @returns {this}
   */
  triggerNext(item) {
    this._requireIsEnabled();
    this._emitter.emit(symbolManualSchedulerEvent, item);
    return this;
  };

  /**
   * Will emit an error on this schedule's Observable. Note that the Observable
   * will end then (i.e. no new items or complete will be emitted).
   * 
   * @param {any} error
   * @returns {this}
   */
  triggerError(error) {
    this._requireIsEnabled();
    this._emitter.emit(symbolScheduleError, error);
    return this;
  };

  /**
   * Will trigger this schedule's Observable's complete-function. Note that the
   * Observable will end then (i.e. no new items or complete will be emitted).
   * 
   * @returns {this}
   */
  triggerComplete() {
    this._requireIsEnabled();
    this._emitter.emit(symbolScheduleComplete);
    return this;
  };
};


/**
 * @author Sebastian Hönel <development@hoenel.net>
 */
class ManualScheduleEventSimple extends ScheduleEvent {
  /**
   * @template T
   * @param {ManualSchedule} manualSchedule
   * @param {T} item
   */
  constructor(manualSchedule, item) {
    super(manualSchedule, item);
  };
};


module.exports = Object.freeze({
  ManualSchedule,
  ManualScheduler,
  ManualScheduleEventSimple,
  symbolManualSchedulerEvent
});