You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

index.js 18KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. 'use strict'
  2. const tape = require('tape')
  3. , child_process = require('child_process')
  4. , workerFarm = require('../')
  5. , childPath = require.resolve('./child')
  6. , fs = require('fs')
  7. , os = require('os')
  8. function uniq (ar) {
  9. let a = [], i, j
  10. o: for (i = 0; i < ar.length; ++i) {
  11. for (j = 0; j < a.length; ++j) if (a[j] == ar[i]) continue o
  12. a[a.length] = ar[i]
  13. }
  14. return a
  15. }
  16. // a child where module.exports = function ...
  17. tape('simple, exports=function test', function (t) {
  18. t.plan(4)
  19. let child = workerFarm(childPath)
  20. child(0, function (err, pid, rnd) {
  21. t.ok(pid > process.pid, 'pid makes sense')
  22. t.ok(pid < process.pid + 750, 'pid makes sense')
  23. t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense')
  24. })
  25. workerFarm.end(child, function () {
  26. t.ok(true, 'workerFarm ended')
  27. })
  28. })
  29. // a child where we have module.exports.fn = function ...
  30. tape('simple, exports.fn test', function (t) {
  31. t.plan(4)
  32. let child = workerFarm(childPath, [ 'run0' ])
  33. child.run0(function (err, pid, rnd) {
  34. t.ok(pid > process.pid, 'pid makes sense')
  35. t.ok(pid < process.pid + 750, 'pid makes sense')
  36. t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense')
  37. })
  38. workerFarm.end(child, function () {
  39. t.ok(true, 'workerFarm ended')
  40. })
  41. })
  42. tape('on child', function (t) {
  43. t.plan(2)
  44. let child = workerFarm({ onChild: function(subprocess) { childPid = subprocess.pid } }, childPath)
  45. , childPid = null;
  46. child(0, function(err, pid) {
  47. t.equal(childPid, pid)
  48. })
  49. workerFarm.end(child, function () {
  50. t.ok(true, 'workerFarm ended')
  51. })
  52. })
  53. // use the returned pids to check that we're using a single child process
  54. // when maxConcurrentWorkers = 1
  55. tape('single worker', function (t) {
  56. t.plan(2)
  57. let child = workerFarm({ maxConcurrentWorkers: 1 }, childPath)
  58. , pids = []
  59. , i = 10
  60. while (i--) {
  61. child(0, function (err, pid) {
  62. pids.push(pid)
  63. if (pids.length == 10) {
  64. t.equal(1, uniq(pids).length, 'only a single process (by pid)')
  65. } else if (pids.length > 10)
  66. t.fail('too many callbacks!')
  67. })
  68. }
  69. workerFarm.end(child, function () {
  70. t.ok(true, 'workerFarm ended')
  71. })
  72. })
  73. // use the returned pids to check that we're using two child processes
  74. // when maxConcurrentWorkers = 2
  75. tape('two workers', function (t) {
  76. t.plan(2)
  77. let child = workerFarm({ maxConcurrentWorkers: 2 }, childPath)
  78. , pids = []
  79. , i = 10
  80. while (i--) {
  81. child(0, function (err, pid) {
  82. pids.push(pid)
  83. if (pids.length == 10) {
  84. t.equal(2, uniq(pids).length, 'only two child processes (by pid)')
  85. } else if (pids.length > 10)
  86. t.fail('too many callbacks!')
  87. })
  88. }
  89. workerFarm.end(child, function () {
  90. t.ok(true, 'workerFarm ended')
  91. })
  92. })
  93. // use the returned pids to check that we're using a child process per
  94. // call when maxConcurrentWorkers = 10
  95. tape('many workers', function (t) {
  96. t.plan(2)
  97. let child = workerFarm({ maxConcurrentWorkers: 10 }, childPath)
  98. , pids = []
  99. , i = 10
  100. while (i--) {
  101. child(1, function (err, pid) {
  102. pids.push(pid)
  103. if (pids.length == 10) {
  104. t.equal(10, uniq(pids).length, 'pids are all the same (by pid)')
  105. } else if (pids.length > 10)
  106. t.fail('too many callbacks!')
  107. })
  108. }
  109. workerFarm.end(child, function () {
  110. t.ok(true, 'workerFarm ended')
  111. })
  112. })
  113. tape('auto start workers', function (t) {
  114. let child = workerFarm({ maxConcurrentWorkers: 3, autoStart: true }, childPath, ['uptime'])
  115. , pids = []
  116. , count = 5
  117. , i = count
  118. , delay = 250
  119. t.plan(count + 1)
  120. setTimeout(function() {
  121. while (i--)
  122. child.uptime(function (err, uptime) {
  123. t.ok(uptime > 10, 'child has been up before the request (' + uptime + 'ms)')
  124. })
  125. workerFarm.end(child, function () {
  126. t.ok(true, 'workerFarm ended')
  127. })
  128. }, delay)
  129. })
  130. // use the returned pids to check that we're using a child process per
  131. // call when we set maxCallsPerWorker = 1 even when we have maxConcurrentWorkers = 1
  132. tape('single call per worker', function (t) {
  133. t.plan(2)
  134. let child = workerFarm({
  135. maxConcurrentWorkers: 1
  136. , maxConcurrentCallsPerWorker: Infinity
  137. , maxCallsPerWorker: 1
  138. , autoStart: true
  139. }, childPath)
  140. , pids = []
  141. , count = 25
  142. , i = count
  143. while (i--) {
  144. child(0, function (err, pid) {
  145. pids.push(pid)
  146. if (pids.length == count) {
  147. t.equal(count, uniq(pids).length, 'one process for each call (by pid)')
  148. workerFarm.end(child, function () {
  149. t.ok(true, 'workerFarm ended')
  150. })
  151. } else if (pids.length > count)
  152. t.fail('too many callbacks!')
  153. })
  154. }
  155. })
  156. // use the returned pids to check that we're using a child process per
  157. // two-calls when we set maxCallsPerWorker = 2 even when we have maxConcurrentWorkers = 1
  158. tape('two calls per worker', function (t) {
  159. t.plan(2)
  160. let child = workerFarm({
  161. maxConcurrentWorkers: 1
  162. , maxConcurrentCallsPerWorker: Infinity
  163. , maxCallsPerWorker: 2
  164. , autoStart: true
  165. }, childPath)
  166. , pids = []
  167. , count = 20
  168. , i = count
  169. while (i--) {
  170. child(0, function (err, pid) {
  171. pids.push(pid)
  172. if (pids.length == count) {
  173. t.equal(count / 2, uniq(pids).length, 'one process for each call (by pid)')
  174. workerFarm.end(child, function () {
  175. t.ok(true, 'workerFarm ended')
  176. })
  177. } else if (pids.length > count)
  178. t.fail('too many callbacks!')
  179. })
  180. }
  181. })
  182. // use timing to confirm that one worker will process calls sequentially
  183. tape('many concurrent calls', function (t) {
  184. t.plan(2)
  185. let child = workerFarm({
  186. maxConcurrentWorkers: 1
  187. , maxConcurrentCallsPerWorker: Infinity
  188. , maxCallsPerWorker: Infinity
  189. , autoStart: true
  190. }, childPath)
  191. , defer = 200
  192. , count = 200
  193. , i = count
  194. , cbc = 0
  195. setTimeout(function () {
  196. let start = Date.now()
  197. while (i--) {
  198. child(defer, function () {
  199. if (++cbc == count) {
  200. let time = Date.now() - start
  201. // upper-limit not tied to `count` at all
  202. t.ok(time > defer && time < (defer * 2.5), 'processed tasks concurrently (' + time + 'ms)')
  203. workerFarm.end(child, function () {
  204. t.ok(true, 'workerFarm ended')
  205. })
  206. } else if (cbc > count)
  207. t.fail('too many callbacks!')
  208. })
  209. }
  210. }, 250)
  211. })
  212. // use timing to confirm that one child processes calls sequentially with
  213. // maxConcurrentCallsPerWorker = 1
  214. tape('single concurrent call', function (t) {
  215. t.plan(2)
  216. let child = workerFarm({
  217. maxConcurrentWorkers: 1
  218. , maxConcurrentCallsPerWorker: 1
  219. , maxCallsPerWorker: Infinity
  220. , autoStart: true
  221. }, childPath)
  222. , defer = 20
  223. , count = 100
  224. , i = count
  225. , cbc = 0
  226. setTimeout(function () {
  227. let start = Date.now()
  228. while (i--) {
  229. child(defer, function () {
  230. if (++cbc == count) {
  231. let time = Date.now() - start
  232. // upper-limit tied closely to `count`, 1.3 is generous but accounts for all the timers
  233. // coming back at the same time and the IPC overhead
  234. t.ok(time > (defer * count) && time < (defer * count * 1.3), 'processed tasks sequentially (' + time + ')')
  235. workerFarm.end(child, function () {
  236. t.ok(true, 'workerFarm ended')
  237. })
  238. } else if (cbc > count)
  239. t.fail('too many callbacks!')
  240. })
  241. }
  242. }, 250)
  243. })
  244. // use timing to confirm that one child processes *only* 5 calls concurrently
  245. tape('multiple concurrent calls', function (t) {
  246. t.plan(2)
  247. let callsPerWorker = 5
  248. , child = workerFarm({
  249. maxConcurrentWorkers: 1
  250. , maxConcurrentCallsPerWorker: callsPerWorker
  251. , maxCallsPerWorker: Infinity
  252. , autoStart: true
  253. }, childPath)
  254. , defer = 100
  255. , count = 100
  256. , i = count
  257. , cbc = 0
  258. setTimeout(function () {
  259. let start = Date.now()
  260. while (i--) {
  261. child(defer, function () {
  262. if (++cbc == count) {
  263. let time = Date.now() - start
  264. let min = defer * 1.5
  265. // (defer * (count / callsPerWorker + 2)) - if precise it'd be count/callsPerWorker
  266. // but accounting for IPC and other overhead, we need to give it a bit of extra time,
  267. // hence the +2
  268. let max = defer * (count / callsPerWorker + 2)
  269. t.ok(time > min && time < max, 'processed tasks concurrently (' + time + ' > ' + min + ' && ' + time + ' < ' + max + ')')
  270. workerFarm.end(child, function () {
  271. t.ok(true, 'workerFarm ended')
  272. })
  273. } else if (cbc > count)
  274. t.fail('too many callbacks!')
  275. })
  276. }
  277. }, 250)
  278. })
  279. // call a method that will die with a probability of 0.5 but expect that
  280. // we'll get results for each of our calls anyway
  281. tape('durability', function (t) {
  282. t.plan(3)
  283. let child = workerFarm({ maxConcurrentWorkers: 2 }, childPath, [ 'killable' ])
  284. , ids = []
  285. , pids = []
  286. , count = 20
  287. , i = count
  288. while (i--) {
  289. child.killable(i, function (err, id, pid) {
  290. ids.push(id)
  291. pids.push(pid)
  292. if (ids.length == count) {
  293. t.ok(uniq(pids).length > 2, 'processed by many (' + uniq(pids).length + ') workers, but got there in the end!')
  294. t.ok(uniq(ids).length == count, 'received a single result for each unique call')
  295. workerFarm.end(child, function () {
  296. t.ok(true, 'workerFarm ended')
  297. })
  298. } else if (ids.length > count)
  299. t.fail('too many callbacks!')
  300. })
  301. }
  302. })
  303. // a callback provided to .end() can and will be called (uses "simple, exports=function test" to create a child)
  304. tape('simple, end callback', function (t) {
  305. t.plan(4)
  306. let child = workerFarm(childPath)
  307. child(0, function (err, pid, rnd) {
  308. t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid)
  309. t.ok(pid < process.pid + 750, 'pid makes sense ' + pid + ' vs ' + process.pid)
  310. t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense')
  311. })
  312. workerFarm.end(child, function() {
  313. t.pass('an .end() callback was successfully called')
  314. })
  315. })
  316. tape('call timeout test', function (t) {
  317. t.plan(3 + 3 + 4 + 4 + 4 + 3 + 1)
  318. let child = workerFarm({ maxCallTime: 250, maxConcurrentWorkers: 1 }, childPath)
  319. // should come back ok
  320. child(50, function (err, pid, rnd) {
  321. t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid)
  322. t.ok(pid < process.pid + 750, 'pid makes sense ' + pid + ' vs ' + process.pid)
  323. t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd)
  324. })
  325. // should come back ok
  326. child(50, function (err, pid, rnd) {
  327. t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid)
  328. t.ok(pid < process.pid + 750, 'pid makes sense ' + pid + ' vs ' + process.pid)
  329. t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd)
  330. })
  331. // should die
  332. child(500, function (err, pid, rnd) {
  333. t.ok(err, 'got an error')
  334. t.equal(err.type, 'TimeoutError', 'correct error type')
  335. t.ok(pid === undefined, 'no pid')
  336. t.ok(rnd === undefined, 'no rnd')
  337. })
  338. // should die
  339. child(1000, function (err, pid, rnd) {
  340. t.ok(err, 'got an error')
  341. t.equal(err.type, 'TimeoutError', 'correct error type')
  342. t.ok(pid === undefined, 'no pid')
  343. t.ok(rnd === undefined, 'no rnd')
  344. })
  345. // should die even though it is only a 100ms task, it'll get caught up
  346. // in a dying worker
  347. setTimeout(function () {
  348. child(100, function (err, pid, rnd) {
  349. t.ok(err, 'got an error')
  350. t.equal(err.type, 'TimeoutError', 'correct error type')
  351. t.ok(pid === undefined, 'no pid')
  352. t.ok(rnd === undefined, 'no rnd')
  353. })
  354. }, 200)
  355. // should be ok, new worker
  356. setTimeout(function () {
  357. child(50, function (err, pid, rnd) {
  358. t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid)
  359. t.ok(pid < process.pid + 750, 'pid makes sense ' + pid + ' vs ' + process.pid)
  360. t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd)
  361. })
  362. workerFarm.end(child, function () {
  363. t.ok(true, 'workerFarm ended')
  364. })
  365. }, 400)
  366. })
  367. tape('test error passing', function (t) {
  368. t.plan(10)
  369. let child = workerFarm(childPath, [ 'err' ])
  370. child.err('Error', 'this is an Error', function (err) {
  371. t.ok(err instanceof Error, 'is an Error object')
  372. t.equal('Error', err.type, 'correct type')
  373. t.equal('this is an Error', err.message, 'correct message')
  374. })
  375. child.err('TypeError', 'this is a TypeError', function (err) {
  376. t.ok(err instanceof Error, 'is a TypeError object')
  377. t.equal('TypeError', err.type, 'correct type')
  378. t.equal('this is a TypeError', err.message, 'correct message')
  379. })
  380. child.err('Error', 'this is an Error with custom props', {foo: 'bar', 'baz': 1}, function (err) {
  381. t.ok(err instanceof Error, 'is an Error object')
  382. t.equal(err.foo, 'bar', 'passes data')
  383. t.equal(err.baz, 1, 'passes data')
  384. })
  385. workerFarm.end(child, function () {
  386. t.ok(true, 'workerFarm ended')
  387. })
  388. })
  389. tape('test maxConcurrentCalls', function (t) {
  390. t.plan(10)
  391. let child = workerFarm({ maxConcurrentCalls: 5 }, childPath)
  392. child(50, function (err) { t.notOk(err, 'no error') })
  393. child(50, function (err) { t.notOk(err, 'no error') })
  394. child(50, function (err) { t.notOk(err, 'no error') })
  395. child(50, function (err) { t.notOk(err, 'no error') })
  396. child(50, function (err) { t.notOk(err, 'no error') })
  397. child(50, function (err) {
  398. t.ok(err)
  399. t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type')
  400. })
  401. child(50, function (err) {
  402. t.ok(err)
  403. t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type')
  404. })
  405. workerFarm.end(child, function () {
  406. t.ok(true, 'workerFarm ended')
  407. })
  408. })
  409. tape('test maxConcurrentCalls + queue', function (t) {
  410. t.plan(13)
  411. let child = workerFarm({ maxConcurrentCalls: 4, maxConcurrentWorkers: 2, maxConcurrentCallsPerWorker: 1 }, childPath)
  412. child(20, function (err) { console.log('ended short1'); t.notOk(err, 'no error, short call 1') })
  413. child(20, function (err) { console.log('ended short2'); t.notOk(err, 'no error, short call 2') })
  414. child(300, function (err) { t.notOk(err, 'no error, long call 1') })
  415. child(300, function (err) { t.notOk(err, 'no error, long call 2') })
  416. child(20, function (err) {
  417. t.ok(err, 'short call 3 should error')
  418. t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type')
  419. })
  420. child(20, function (err) {
  421. t.ok(err, 'short call 4 should error')
  422. t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type')
  423. })
  424. // cross fingers and hope the two short jobs have ended
  425. setTimeout(function () {
  426. child(20, function (err) { t.notOk(err, 'no error, delayed short call 1') })
  427. child(20, function (err) { t.notOk(err, 'no error, delayed short call 2') })
  428. child(20, function (err) {
  429. t.ok(err, 'delayed short call 3 should error')
  430. t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type')
  431. })
  432. workerFarm.end(child, function () {
  433. t.ok(true, 'workerFarm ended')
  434. })
  435. }, 250)
  436. })
  437. // this test should not keep the process running! if the test process
  438. // doesn't die then the problem is here
  439. tape('test timeout kill', function (t) {
  440. t.plan(3)
  441. let child = workerFarm({ maxCallTime: 250, maxConcurrentWorkers: 1 }, childPath, [ 'block' ])
  442. child.block(function (err) {
  443. t.ok(err, 'got an error')
  444. t.equal(err.type, 'TimeoutError', 'correct error type')
  445. })
  446. workerFarm.end(child, function () {
  447. t.ok(true, 'workerFarm ended')
  448. })
  449. })
  450. tape('test max retries after process terminate', function (t) {
  451. t.plan(7)
  452. // temporary file is used to store the number of retries among terminating workers
  453. let filepath1 = '.retries1'
  454. let child1 = workerFarm({ maxConcurrentWorkers: 1, maxRetries: 5}, childPath, [ 'stubborn' ])
  455. child1.stubborn(filepath1, function (err, result) {
  456. t.notOk(err, 'no error')
  457. t.equal(result, 12, 'correct result')
  458. })
  459. workerFarm.end(child1, function () {
  460. fs.unlinkSync(filepath1)
  461. t.ok(true, 'workerFarm ended')
  462. })
  463. let filepath2 = '.retries2'
  464. let child2 = workerFarm({ maxConcurrentWorkers: 1, maxRetries: 3}, childPath, [ 'stubborn' ])
  465. child2.stubborn(filepath2, function (err, result) {
  466. t.ok(err, 'got an error')
  467. t.equal(err.type, 'ProcessTerminatedError', 'correct error type')
  468. t.equal(err.message, 'cancel after 3 retries!', 'correct message and number of retries')
  469. })
  470. workerFarm.end(child2, function () {
  471. fs.unlinkSync(filepath2)
  472. t.ok(true, 'workerFarm ended')
  473. })
  474. })
  475. tape('custom arguments can be passed to "fork"', function (t) {
  476. t.plan(3)
  477. // allocate a real, valid path, in any OS
  478. let cwd = fs.realpathSync(os.tmpdir())
  479. , workerOptions = {
  480. cwd : cwd
  481. , execArgv : ['--expose-gc']
  482. }
  483. , child = workerFarm({ maxConcurrentWorkers: 1, maxRetries: 5, workerOptions: workerOptions}, childPath, ['args'])
  484. child.args(function (err, result) {
  485. t.equal(result.execArgv[0], '--expose-gc', 'flags passed (overridden default)')
  486. t.equal(result.cwd, cwd, 'correct cwd folder')
  487. })
  488. workerFarm.end(child, function () {
  489. t.ok(true, 'workerFarm ended')
  490. })
  491. })
  492. tape('ensure --debug/--inspect not propagated to children', function (t) {
  493. t.plan(3)
  494. let script = __dirname + '/debug.js'
  495. , debugArg = process.version.replace(/^v(\d+)\..*$/, '$1') >= 8 ? '--inspect' : '--debug=8881'
  496. , child = child_process.spawn(process.execPath, [ debugArg, script ])
  497. , stdout = ''
  498. child.stdout.on('data', function (data) {
  499. stdout += data.toString()
  500. })
  501. child.on('close', function (code) {
  502. t.equal(code, 0, 'exited without error (' + code + ')')
  503. t.ok(stdout.indexOf('FINISHED') > -1, 'process finished')
  504. t.ok(stdout.indexOf('--debug') === -1, 'child does not receive debug flag')
  505. })
  506. })