You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

pipeline.js 2.4KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Ported from https://github.com/mafintosh/pump with
  2. // permission from the author, Mathias Buus (@mafintosh).
  3. 'use strict';
  4. var eos;
  5. function once(callback) {
  6. var called = false;
  7. return function () {
  8. if (called) return;
  9. called = true;
  10. callback.apply(void 0, arguments);
  11. };
  12. }
  13. var _require$codes = require('../../../errors').codes,
  14. ERR_MISSING_ARGS = _require$codes.ERR_MISSING_ARGS,
  15. ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED;
  16. function noop(err) {
  17. // Rethrow the error if it exists to avoid swallowing it
  18. if (err) throw err;
  19. }
  20. function isRequest(stream) {
  21. return stream.setHeader && typeof stream.abort === 'function';
  22. }
  23. function destroyer(stream, reading, writing, callback) {
  24. callback = once(callback);
  25. var closed = false;
  26. stream.on('close', function () {
  27. closed = true;
  28. });
  29. if (eos === undefined) eos = require('./end-of-stream');
  30. eos(stream, {
  31. readable: reading,
  32. writable: writing
  33. }, function (err) {
  34. if (err) return callback(err);
  35. closed = true;
  36. callback();
  37. });
  38. var destroyed = false;
  39. return function (err) {
  40. if (closed) return;
  41. if (destroyed) return;
  42. destroyed = true; // request.destroy just do .end - .abort is what we want
  43. if (isRequest(stream)) return stream.abort();
  44. if (typeof stream.destroy === 'function') return stream.destroy();
  45. callback(err || new ERR_STREAM_DESTROYED('pipe'));
  46. };
  47. }
  48. function call(fn) {
  49. fn();
  50. }
  51. function pipe(from, to) {
  52. return from.pipe(to);
  53. }
  54. function popCallback(streams) {
  55. if (!streams.length) return noop;
  56. if (typeof streams[streams.length - 1] !== 'function') return noop;
  57. return streams.pop();
  58. }
  59. function pipeline() {
  60. for (var _len = arguments.length, streams = new Array(_len), _key = 0; _key < _len; _key++) {
  61. streams[_key] = arguments[_key];
  62. }
  63. var callback = popCallback(streams);
  64. if (Array.isArray(streams[0])) streams = streams[0];
  65. if (streams.length < 2) {
  66. throw new ERR_MISSING_ARGS('streams');
  67. }
  68. var error;
  69. var destroys = streams.map(function (stream, i) {
  70. var reading = i < streams.length - 1;
  71. var writing = i > 0;
  72. return destroyer(stream, reading, writing, function (err) {
  73. if (!error) error = err;
  74. if (err) destroys.forEach(call);
  75. if (reading) return;
  76. destroys.forEach(call);
  77. callback(error);
  78. });
  79. });
  80. return streams.reduce(pipe);
  81. }
  82. module.exports = pipeline;