Source: ProcessWrapper.js

  1. const cp = require('child_process')
  2. , EventEmitter = require('events').EventEmitter
  3. , Rx = require('rxjs')
  4. , Observable = Rx.Observable
  5. , fromEvent = Rx.fromEvent
  6. , { mergeObjects } = require('./tools/Objects')
  7. , symbolProcessOutput = Symbol('processOutput');
  8. /**
  9. * General purpose class that can represent any result of a terminated Process.
  10. */
  11. class ProcessResult {
  12. constructor() { };
  13. };
  14. /**
  15. * Inherits from ProcessResult and represents an errored Process. This class
  16. * is usually used for Processes that did not even start or were impossible
  17. * to launch with the given configuration. A Process that started but errored
  18. * then is usually terminated with an instance of ProcessExit.
  19. */
  20. class ProcessErrorResult extends ProcessResult {
  21. /**
  22. * @param {any} error
  23. */
  24. constructor(error) {
  25. super();
  26. this.error = error;
  27. };
  28. };
  29. /**
  30. * Inherits from ProcessResult and represents the details of an exited Process.
  31. * This result can either be positive (non-faulted) or negative. The result will
  32. * provide details about the termination, such as exit-code, signal (if available)
  33. * or the contents of stderr and stdout (if configured to be used).
  34. */
  35. class ProcessExit extends ProcessResult {
  36. /**
  37. * @param {boolean} faulted
  38. * @param {number} code
  39. * @param {string|null} signal
  40. * @param {string|null} stdOut
  41. * @param {string|null} stdErr
  42. */
  43. constructor(faulted, code, signal, stdOut, stdErr) {
  44. super();
  45. this.faulted = faulted;
  46. this.code = code;
  47. this.signal = signal;
  48. this.stdOut = stdOut;
  49. this.stdErr = stdErr;
  50. };
  51. }
  52. /**
  53. * This class is used for whenever a wrapped Process generates any kind
  54. * of output.
  55. */
  56. class ProcessOutput {
  57. /**
  58. * @param {'stdout'|'stderr'} streamType
  59. * @param {Buffer|String} chunk
  60. */
  61. constructor(streamType, chunk) {
  62. this.streamType = streamType;
  63. this.chunk = chunk;
  64. };
  65. get isStdOut() {
  66. return this.streamType === 'stdout';
  67. };
  68. get isStdErr() {
  69. return this.streamType === 'stderr';
  70. };
  71. /**
  72. * Provides the chunk as string, converted to utf-8.
  73. */
  74. get asString() {
  75. return this.chunk instanceof Buffer ?
  76. this.chunk.toString('utf-8') : this.chunk;
  77. };
  78. }
  79. /**
  80. * Wraps (the execution of) a Process that is created using child_process.
  81. * Provides convenient async behavior and Observables as well as eventing.
  82. */
  83. class ProcessWrapper extends EventEmitter {
  84. /**
  85. * @param {string} command
  86. * @param {Array.<string>} args
  87. * @param {SpawnOptions|Object} options The same options that child_process.spawn(..) supports.
  88. */
  89. constructor(command, args = [], options = {}) {
  90. super();
  91. this.command = command;
  92. this.args = args;
  93. this.options = options;
  94. /** @protected */
  95. this._wasStarted = false;
  96. /** @protected */
  97. this._isRunning = false;
  98. /**
  99. * @type {Observable.<ProcessOutput>}
  100. * @protected
  101. */
  102. this._observable = fromEvent(this, symbolProcessOutput);
  103. /**
  104. * @type {ProcessResult}
  105. * @protected
  106. */
  107. this._processResult = void 0;
  108. /**
  109. * @type {NodeJS.Process}
  110. * @protected
  111. */
  112. this._process = void 0;
  113. };
  114. /**
  115. * Returns the underlying process as returned by child_process.
  116. *
  117. * @throws {Error} if this process was not yet started.
  118. * @returns {NodeJS.Process}
  119. */
  120. get process() {
  121. if (!this.wasStarted) {
  122. throw new Error('This process was not yet started.');
  123. }
  124. return this._process;
  125. };
  126. /**
  127. * @returns {Boolean}
  128. */
  129. get wasStarted() {
  130. return this._wasStarted;
  131. };
  132. /**
  133. * @returns {Boolean}
  134. */
  135. get isRunning() {
  136. return this._isRunning;
  137. };
  138. /**
  139. * Creates and returns a general Observable for any output of this Process. The
  140. * Observable will just observe any emits. Thus it never errors and never drains.
  141. *
  142. * @returns {Observable.<ProcessOutput>}
  143. */
  144. get observable() {
  145. return this._observable;
  146. };
  147. /**
  148. * This property will yield a value once the Process terminated. Note that in the
  149. * API of Observable, the complete()-function does not support arguments, so that
  150. * we cannot pass in the result there.
  151. * Instead the wrapper guarantees that, once the Process terminated, its result
  152. * will be made available before any events are omitted or complete() gets called.
  153. *
  154. * @returns {ProcessResult|ProcessErrorResult|ProcessExit}
  155. */
  156. get result() {
  157. return this._processResult;
  158. };
  159. /**
  160. * The default action to run this process. At the moment it defaults to spawn().
  161. * The default behavior may change in the future, so do not rely on this method
  162. * and use the specialized instantiation methods, if you require e.g. fork()
  163. * instead of spawn().
  164. * However regardless, this method guarantees that the process will be executed
  165. * and that its result can be observed or obtained by awaiting its termination
  166. * (that depends on the behavior of 'emitOnly').
  167. *
  168. * @param {boolean} emitOnly If set to true, the resulting value of the resolving
  169. * Promise will be empty (i.e. stdout and stderr will be empty and not be collected
  170. * while the process is running; instead, all output is emitted (events and obser-
  171. * vable) while the process is running). It is recommended to set this to true, if
  172. * the underlying process outputs a lot of data to the std-streams. That data would
  173. * otherwise be held in the node.js process. However, if set to false, the resulting
  174. * value of the Promise will be the process' entire output.
  175. */
  176. async run(emitOnly = true) {
  177. if (this.isRunning) {
  178. throw new Error('The process is already running.');
  179. }
  180. return await this.spawnAsync(emitOnly);
  181. };
  182. /**
  183. * The default action to start and observe this process. At the moment it defaults
  184. * to spawn() and thus spawnObservable(). The default behavior may change in the future,
  185. * so do not rely on this method and use the specialized instantiation methods, if you
  186. * require e.g. fork() instead of spawn().
  187. * However regardless, this method guarantees that the process will be executed and that
  188. * its result can be observed. When the process errors or terminates, the observable will
  189. * forward those errors or call onComplete() accordingly.
  190. *
  191. * @returns {Observable.<ProcessOutput>}
  192. */
  193. runObservable() {
  194. return this.spawnObservable();
  195. };
  196. /**
  197. * Calls 'spawnAsync(emitOnly=true)' so that the entire Process becomes
  198. * observable while it runs. When the process errors or finishes, the
  199. * observable will output an error or complete, respectively.
  200. *
  201. * @returns {Observable.<ProcessOutput>}
  202. */
  203. spawnObservable() {
  204. return new Observable(async subs => {
  205. const subscription = fromEvent(this, symbolProcessOutput)
  206. .subscribe(next => {
  207. subs.next(next);
  208. });
  209. try {
  210. await this.spawnAsync(true);
  211. subs.complete();
  212. } catch (e) {
  213. subs.error(e);
  214. } finally {
  215. subscription.unsubscribe();
  216. }
  217. });
  218. };
  219. /**
  220. * Launches the process using spawn().
  221. *
  222. * @param {boolean} emitOnly Has the same effect as 'run(emitOnly)', please refer
  223. * to the documentation there.
  224. * @returns {Promise.<ProcessResult>} The Promise will be rejected if the process
  225. * exits with a non-zero result or if, when spawning it, an error occurs. In the
  226. * first case, the result will be of type ProcessExit, while in the latter case, it
  227. * will be of type ProcessErrorResult. Both types inherit from ProcessResult.
  228. */
  229. spawnAsync(emitOnly = true) {
  230. return new Promise((resolve, reject) => {
  231. if (this.isRunning) {
  232. throw new Error('The process is already running.');
  233. }
  234. this._isRunning = true;
  235. /**
  236. * @param {ProcessResult} procExit
  237. */
  238. const shutdownFunc = procExit => {
  239. this._isRunning = false;
  240. this._processResult = procExit;
  241. if (procExit instanceof ProcessErrorResult
  242. || (procExit instanceof ProcessExit && procExit.faulted)) {
  243. reject(procExit);
  244. } else {
  245. resolve(procExit);
  246. }
  247. };
  248. /**
  249. * @param {'stdout'|'stderr'} streamName
  250. * @param {Buffer|String} chunk
  251. * @param {Array.<String>} streamArr
  252. */
  253. const dataFn = (streamName, chunk, streamArr) => {
  254. const out = new ProcessOutput(streamName, chunk);
  255. if (!emitOnly) {
  256. streamArr.push(out.asString);
  257. }
  258. this.emit(symbolProcessOutput, out);
  259. };
  260. const stdOut = [], stdErr = [];
  261. const options = mergeObjects({}, this.options);
  262. options.stdio = 'pipe';
  263. const proc = cp.spawn(this.command, this.args, this.options)
  264. .once('error', err =>
  265. shutdownFunc(new ProcessErrorResult(err)))
  266. .once('exit', (code, sig) =>
  267. shutdownFunc(new ProcessExit(
  268. code !== 0, code, sig, stdOut.join(''), stdErr.join(''))));
  269. this._wasStarted = true;
  270. proc.stdout.on('data', chunk => dataFn('stdout', chunk, stdOut));
  271. proc.stderr.on('data', chunk => dataFn('stderr', chunk, stdErr));
  272. // Expose, after everything is set up:
  273. this._process = proc;
  274. });
  275. };
  276. };
  277. module.exports = Object.freeze({
  278. ProcessResult,
  279. ProcessErrorResult,
  280. ProcessExit,
  281. ProcessOutput,
  282. ProcessWrapper,
  283. symbolProcessOutput
  284. });