You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1323 lines
45 KiB

  1. from test import support
  2. # Skip tests if _multiprocessing wasn't built.
  3. support.import_module('_multiprocessing')
  4. # Skip tests if sem_open implementation is broken.
  5. support.import_module('multiprocessing.synchronize')
  6. from test.support.script_helper import assert_python_ok
  7. import contextlib
  8. import itertools
  9. import logging
  10. from logging.handlers import QueueHandler
  11. import os
  12. import queue
  13. import sys
  14. import threading
  15. import time
  16. import unittest
  17. import weakref
  18. from pickle import PicklingError
  19. from concurrent import futures
  20. from concurrent.futures._base import (
  21. PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
  22. BrokenExecutor)
  23. from concurrent.futures.process import BrokenProcessPool
  24. from multiprocessing import get_context
  25. import multiprocessing.process
  26. import multiprocessing.util
  27. def create_future(state=PENDING, exception=None, result=None):
  28. f = Future()
  29. f._state = state
  30. f._exception = exception
  31. f._result = result
  32. return f
  33. PENDING_FUTURE = create_future(state=PENDING)
  34. RUNNING_FUTURE = create_future(state=RUNNING)
  35. CANCELLED_FUTURE = create_future(state=CANCELLED)
  36. CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
  37. EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
  38. SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
  39. INITIALIZER_STATUS = 'uninitialized'
  40. def mul(x, y):
  41. return x * y
  42. def capture(*args, **kwargs):
  43. return args, kwargs
  44. def sleep_and_raise(t):
  45. time.sleep(t)
  46. raise Exception('this is an exception')
  47. def sleep_and_print(t, msg):
  48. time.sleep(t)
  49. print(msg)
  50. sys.stdout.flush()
  51. def init(x):
  52. global INITIALIZER_STATUS
  53. INITIALIZER_STATUS = x
  54. def get_init_status():
  55. return INITIALIZER_STATUS
  56. def init_fail(log_queue=None):
  57. if log_queue is not None:
  58. logger = logging.getLogger('concurrent.futures')
  59. logger.addHandler(QueueHandler(log_queue))
  60. logger.setLevel('CRITICAL')
  61. logger.propagate = False
  62. time.sleep(0.1) # let some futures be scheduled
  63. raise ValueError('error in initializer')
  64. class MyObject(object):
  65. def my_method(self):
  66. pass
  67. class EventfulGCObj():
  68. def __init__(self, ctx):
  69. mgr = get_context(ctx).Manager()
  70. self.event = mgr.Event()
  71. def __del__(self):
  72. self.event.set()
  73. def make_dummy_object(_):
  74. return MyObject()
  75. class BaseTestCase(unittest.TestCase):
  76. def setUp(self):
  77. self._thread_key = support.threading_setup()
  78. def tearDown(self):
  79. support.reap_children()
  80. support.threading_cleanup(*self._thread_key)
  81. class ExecutorMixin:
  82. worker_count = 5
  83. executor_kwargs = {}
  84. def setUp(self):
  85. super().setUp()
  86. self.t1 = time.monotonic()
  87. if hasattr(self, "ctx"):
  88. self.executor = self.executor_type(
  89. max_workers=self.worker_count,
  90. mp_context=self.get_context(),
  91. **self.executor_kwargs)
  92. else:
  93. self.executor = self.executor_type(
  94. max_workers=self.worker_count,
  95. **self.executor_kwargs)
  96. self._prime_executor()
  97. def tearDown(self):
  98. self.executor.shutdown(wait=True)
  99. self.executor = None
  100. dt = time.monotonic() - self.t1
  101. if support.verbose:
  102. print("%.2fs" % dt, end=' ')
  103. self.assertLess(dt, 300, "synchronization issue: test lasted too long")
  104. super().tearDown()
  105. def get_context(self):
  106. return get_context(self.ctx)
  107. def _prime_executor(self):
  108. # Make sure that the executor is ready to do work before running the
  109. # tests. This should reduce the probability of timeouts in the tests.
  110. futures = [self.executor.submit(time.sleep, 0.1)
  111. for _ in range(self.worker_count)]
  112. for f in futures:
  113. f.result()
  114. class ThreadPoolMixin(ExecutorMixin):
  115. executor_type = futures.ThreadPoolExecutor
  116. class ProcessPoolForkMixin(ExecutorMixin):
  117. executor_type = futures.ProcessPoolExecutor
  118. ctx = "fork"
  119. def get_context(self):
  120. if sys.platform == "win32":
  121. self.skipTest("require unix system")
  122. return super().get_context()
  123. class ProcessPoolSpawnMixin(ExecutorMixin):
  124. executor_type = futures.ProcessPoolExecutor
  125. ctx = "spawn"
  126. class ProcessPoolForkserverMixin(ExecutorMixin):
  127. executor_type = futures.ProcessPoolExecutor
  128. ctx = "forkserver"
  129. def get_context(self):
  130. if sys.platform == "win32":
  131. self.skipTest("require unix system")
  132. return super().get_context()
  133. def create_executor_tests(mixin, bases=(BaseTestCase,),
  134. executor_mixins=(ThreadPoolMixin,
  135. ProcessPoolForkMixin,
  136. ProcessPoolForkserverMixin,
  137. ProcessPoolSpawnMixin)):
  138. def strip_mixin(name):
  139. if name.endswith(('Mixin', 'Tests')):
  140. return name[:-5]
  141. elif name.endswith('Test'):
  142. return name[:-4]
  143. else:
  144. return name
  145. for exe in executor_mixins:
  146. name = ("%s%sTest"
  147. % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
  148. cls = type(name, (mixin,) + (exe,) + bases, {})
  149. globals()[name] = cls
  150. class InitializerMixin(ExecutorMixin):
  151. worker_count = 2
  152. def setUp(self):
  153. global INITIALIZER_STATUS
  154. INITIALIZER_STATUS = 'uninitialized'
  155. self.executor_kwargs = dict(initializer=init,
  156. initargs=('initialized',))
  157. super().setUp()
  158. def test_initializer(self):
  159. futures = [self.executor.submit(get_init_status)
  160. for _ in range(self.worker_count)]
  161. for f in futures:
  162. self.assertEqual(f.result(), 'initialized')
  163. class FailingInitializerMixin(ExecutorMixin):
  164. worker_count = 2
  165. def setUp(self):
  166. if hasattr(self, "ctx"):
  167. # Pass a queue to redirect the child's logging output
  168. self.mp_context = self.get_context()
  169. self.log_queue = self.mp_context.Queue()
  170. self.executor_kwargs = dict(initializer=init_fail,
  171. initargs=(self.log_queue,))
  172. else:
  173. # In a thread pool, the child shares our logging setup
  174. # (see _assert_logged())
  175. self.mp_context = None
  176. self.log_queue = None
  177. self.executor_kwargs = dict(initializer=init_fail)
  178. super().setUp()
  179. def test_initializer(self):
  180. with self._assert_logged('ValueError: error in initializer'):
  181. try:
  182. future = self.executor.submit(get_init_status)
  183. except BrokenExecutor:
  184. # Perhaps the executor is already broken
  185. pass
  186. else:
  187. with self.assertRaises(BrokenExecutor):
  188. future.result()
  189. # At some point, the executor should break
  190. t1 = time.monotonic()
  191. while not self.executor._broken:
  192. if time.monotonic() - t1 > 5:
  193. self.fail("executor not broken after 5 s.")
  194. time.sleep(0.01)
  195. # ... and from this point submit() is guaranteed to fail
  196. with self.assertRaises(BrokenExecutor):
  197. self.executor.submit(get_init_status)
  198. def _prime_executor(self):
  199. pass
  200. @contextlib.contextmanager
  201. def _assert_logged(self, msg):
  202. if self.log_queue is not None:
  203. yield
  204. output = []
  205. try:
  206. while True:
  207. output.append(self.log_queue.get_nowait().getMessage())
  208. except queue.Empty:
  209. pass
  210. else:
  211. with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
  212. yield
  213. output = cm.output
  214. self.assertTrue(any(msg in line for line in output),
  215. output)
  216. create_executor_tests(InitializerMixin)
  217. create_executor_tests(FailingInitializerMixin)
  218. class ExecutorShutdownTest:
  219. def test_run_after_shutdown(self):
  220. self.executor.shutdown()
  221. self.assertRaises(RuntimeError,
  222. self.executor.submit,
  223. pow, 2, 5)
  224. def test_interpreter_shutdown(self):
  225. # Test the atexit hook for shutdown of worker threads and processes
  226. rc, out, err = assert_python_ok('-c', """if 1:
  227. from concurrent.futures import {executor_type}
  228. from time import sleep
  229. from test.test_concurrent_futures import sleep_and_print
  230. if __name__ == "__main__":
  231. context = '{context}'
  232. if context == "":
  233. t = {executor_type}(5)
  234. else:
  235. from multiprocessing import get_context
  236. context = get_context(context)
  237. t = {executor_type}(5, mp_context=context)
  238. t.submit(sleep_and_print, 1.0, "apple")
  239. """.format(executor_type=self.executor_type.__name__,
  240. context=getattr(self, "ctx", "")))
  241. # Errors in atexit hooks don't change the process exit code, check
  242. # stderr manually.
  243. self.assertFalse(err)
  244. self.assertEqual(out.strip(), b"apple")
  245. def test_submit_after_interpreter_shutdown(self):
  246. # Test the atexit hook for shutdown of worker threads and processes
  247. rc, out, err = assert_python_ok('-c', """if 1:
  248. import atexit
  249. @atexit.register
  250. def run_last():
  251. try:
  252. t.submit(id, None)
  253. except RuntimeError:
  254. print("runtime-error")
  255. raise
  256. from concurrent.futures import {executor_type}
  257. if __name__ == "__main__":
  258. context = '{context}'
  259. if not context:
  260. t = {executor_type}(5)
  261. else:
  262. from multiprocessing import get_context
  263. context = get_context(context)
  264. t = {executor_type}(5, mp_context=context)
  265. t.submit(id, 42).result()
  266. """.format(executor_type=self.executor_type.__name__,
  267. context=getattr(self, "ctx", "")))
  268. # Errors in atexit hooks don't change the process exit code, check
  269. # stderr manually.
  270. self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
  271. self.assertEqual(out.strip(), b"runtime-error")
  272. def test_hang_issue12364(self):
  273. fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
  274. self.executor.shutdown()
  275. for f in fs:
  276. f.result()
  277. class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
  278. def _prime_executor(self):
  279. pass
  280. def test_threads_terminate(self):
  281. def acquire_lock(lock):
  282. lock.acquire()
  283. sem = threading.Semaphore(0)
  284. for i in range(3):
  285. self.executor.submit(acquire_lock, sem)
  286. self.assertEqual(len(self.executor._threads), 3)
  287. for i in range(3):
  288. sem.release()
  289. self.executor.shutdown()
  290. for t in self.executor._threads:
  291. t.join()
  292. def test_context_manager_shutdown(self):
  293. with futures.ThreadPoolExecutor(max_workers=5) as e:
  294. executor = e
  295. self.assertEqual(list(e.map(abs, range(-5, 5))),
  296. [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
  297. for t in executor._threads:
  298. t.join()
  299. def test_del_shutdown(self):
  300. executor = futures.ThreadPoolExecutor(max_workers=5)
  301. executor.map(abs, range(-5, 5))
  302. threads = executor._threads
  303. del executor
  304. for t in threads:
  305. t.join()
  306. def test_thread_names_assigned(self):
  307. executor = futures.ThreadPoolExecutor(
  308. max_workers=5, thread_name_prefix='SpecialPool')
  309. executor.map(abs, range(-5, 5))
  310. threads = executor._threads
  311. del executor
  312. for t in threads:
  313. self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
  314. t.join()
  315. def test_thread_names_default(self):
  316. executor = futures.ThreadPoolExecutor(max_workers=5)
  317. executor.map(abs, range(-5, 5))
  318. threads = executor._threads
  319. del executor
  320. for t in threads:
  321. # Ensure that our default name is reasonably sane and unique when
  322. # no thread_name_prefix was supplied.
  323. self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
  324. t.join()
  325. class ProcessPoolShutdownTest(ExecutorShutdownTest):
  326. def _prime_executor(self):
  327. pass
  328. def test_processes_terminate(self):
  329. self.executor.submit(mul, 21, 2)
  330. self.executor.submit(mul, 6, 7)
  331. self.executor.submit(mul, 3, 14)
  332. self.assertEqual(len(self.executor._processes), 5)
  333. processes = self.executor._processes
  334. self.executor.shutdown()
  335. for p in processes.values():
  336. p.join()
  337. def test_context_manager_shutdown(self):
  338. with futures.ProcessPoolExecutor(max_workers=5) as e:
  339. processes = e._processes
  340. self.assertEqual(list(e.map(abs, range(-5, 5))),
  341. [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
  342. for p in processes.values():
  343. p.join()
  344. def test_del_shutdown(self):
  345. executor = futures.ProcessPoolExecutor(max_workers=5)
  346. list(executor.map(abs, range(-5, 5)))
  347. queue_management_thread = executor._queue_management_thread
  348. processes = executor._processes
  349. call_queue = executor._call_queue
  350. queue_management_thread = executor._queue_management_thread
  351. del executor
  352. # Make sure that all the executor resources were properly cleaned by
  353. # the shutdown process
  354. queue_management_thread.join()
  355. for p in processes.values():
  356. p.join()
  357. call_queue.join_thread()
  358. create_executor_tests(ProcessPoolShutdownTest,
  359. executor_mixins=(ProcessPoolForkMixin,
  360. ProcessPoolForkserverMixin,
  361. ProcessPoolSpawnMixin))
  362. class WaitTests:
  363. def test_first_completed(self):
  364. future1 = self.executor.submit(mul, 21, 2)
  365. future2 = self.executor.submit(time.sleep, 1.5)
  366. done, not_done = futures.wait(
  367. [CANCELLED_FUTURE, future1, future2],
  368. return_when=futures.FIRST_COMPLETED)
  369. self.assertEqual(set([future1]), done)
  370. self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
  371. def test_first_completed_some_already_completed(self):
  372. future1 = self.executor.submit(time.sleep, 1.5)
  373. finished, pending = futures.wait(
  374. [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
  375. return_when=futures.FIRST_COMPLETED)
  376. self.assertEqual(
  377. set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
  378. finished)
  379. self.assertEqual(set([future1]), pending)
  380. def test_first_exception(self):
  381. future1 = self.executor.submit(mul, 2, 21)
  382. future2 = self.executor.submit(sleep_and_raise, 1.5)
  383. future3 = self.executor.submit(time.sleep, 3)
  384. finished, pending = futures.wait(
  385. [future1, future2, future3],
  386. return_when=futures.FIRST_EXCEPTION)
  387. self.assertEqual(set([future1, future2]), finished)
  388. self.assertEqual(set([future3]), pending)
  389. def test_first_exception_some_already_complete(self):
  390. future1 = self.executor.submit(divmod, 21, 0)
  391. future2 = self.executor.submit(time.sleep, 1.5)
  392. finished, pending = futures.wait(
  393. [SUCCESSFUL_FUTURE,
  394. CANCELLED_FUTURE,
  395. CANCELLED_AND_NOTIFIED_FUTURE,
  396. future1, future2],
  397. return_when=futures.FIRST_EXCEPTION)
  398. self.assertEqual(set([SUCCESSFUL_FUTURE,
  399. CANCELLED_AND_NOTIFIED_FUTURE,
  400. future1]), finished)
  401. self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
  402. def test_first_exception_one_already_failed(self):
  403. future1 = self.executor.submit(time.sleep, 2)
  404. finished, pending = futures.wait(
  405. [EXCEPTION_FUTURE, future1],
  406. return_when=futures.FIRST_EXCEPTION)
  407. self.assertEqual(set([EXCEPTION_FUTURE]), finished)
  408. self.assertEqual(set([future1]), pending)
  409. def test_all_completed(self):
  410. future1 = self.executor.submit(divmod, 2, 0)
  411. future2 = self.executor.submit(mul, 2, 21)
  412. finished, pending = futures.wait(
  413. [SUCCESSFUL_FUTURE,
  414. CANCELLED_AND_NOTIFIED_FUTURE,
  415. EXCEPTION_FUTURE,
  416. future1,
  417. future2],
  418. return_when=futures.ALL_COMPLETED)
  419. self.assertEqual(set([SUCCESSFUL_FUTURE,
  420. CANCELLED_AND_NOTIFIED_FUTURE,
  421. EXCEPTION_FUTURE,
  422. future1,
  423. future2]), finished)
  424. self.assertEqual(set(), pending)
  425. def test_timeout(self):
  426. future1 = self.executor.submit(mul, 6, 7)
  427. future2 = self.executor.submit(time.sleep, 6)
  428. finished, pending = futures.wait(
  429. [CANCELLED_AND_NOTIFIED_FUTURE,
  430. EXCEPTION_FUTURE,
  431. SUCCESSFUL_FUTURE,
  432. future1, future2],
  433. timeout=5,
  434. return_when=futures.ALL_COMPLETED)
  435. self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
  436. EXCEPTION_FUTURE,
  437. SUCCESSFUL_FUTURE,
  438. future1]), finished)
  439. self.assertEqual(set([future2]), pending)
  440. class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
  441. def test_pending_calls_race(self):
  442. # Issue #14406: multi-threaded race condition when waiting on all
  443. # futures.
  444. event = threading.Event()
  445. def future_func():
  446. event.wait()
  447. oldswitchinterval = sys.getswitchinterval()
  448. sys.setswitchinterval(1e-6)
  449. try:
  450. fs = {self.executor.submit(future_func) for i in range(100)}
  451. event.set()
  452. futures.wait(fs, return_when=futures.ALL_COMPLETED)
  453. finally:
  454. sys.setswitchinterval(oldswitchinterval)
  455. create_executor_tests(WaitTests,
  456. executor_mixins=(ProcessPoolForkMixin,
  457. ProcessPoolForkserverMixin,
  458. ProcessPoolSpawnMixin))
  459. class AsCompletedTests:
  460. # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
  461. def test_no_timeout(self):
  462. future1 = self.executor.submit(mul, 2, 21)
  463. future2 = self.executor.submit(mul, 7, 6)
  464. completed = set(futures.as_completed(
  465. [CANCELLED_AND_NOTIFIED_FUTURE,
  466. EXCEPTION_FUTURE,
  467. SUCCESSFUL_FUTURE,
  468. future1, future2]))
  469. self.assertEqual(set(
  470. [CANCELLED_AND_NOTIFIED_FUTURE,
  471. EXCEPTION_FUTURE,
  472. SUCCESSFUL_FUTURE,
  473. future1, future2]),
  474. completed)
  475. def test_zero_timeout(self):
  476. future1 = self.executor.submit(time.sleep, 2)
  477. completed_futures = set()
  478. try:
  479. for future in futures.as_completed(
  480. [CANCELLED_AND_NOTIFIED_FUTURE,
  481. EXCEPTION_FUTURE,
  482. SUCCESSFUL_FUTURE,
  483. future1],
  484. timeout=0):
  485. completed_futures.add(future)
  486. except futures.TimeoutError:
  487. pass
  488. self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
  489. EXCEPTION_FUTURE,
  490. SUCCESSFUL_FUTURE]),
  491. completed_futures)
  492. def test_duplicate_futures(self):
  493. # Issue 20367. Duplicate futures should not raise exceptions or give
  494. # duplicate responses.
  495. # Issue #31641: accept arbitrary iterables.
  496. future1 = self.executor.submit(time.sleep, 2)
  497. completed = [
  498. f for f in futures.as_completed(itertools.repeat(future1, 3))
  499. ]
  500. self.assertEqual(len(completed), 1)
  501. def test_free_reference_yielded_future(self):
  502. # Issue #14406: Generator should not keep references
  503. # to finished futures.
  504. futures_list = [Future() for _ in range(8)]
  505. futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
  506. futures_list.append(create_future(state=FINISHED, result=42))
  507. with self.assertRaises(futures.TimeoutError):
  508. for future in futures.as_completed(futures_list, timeout=0):
  509. futures_list.remove(future)
  510. wr = weakref.ref(future)
  511. del future
  512. self.assertIsNone(wr())
  513. futures_list[0].set_result("test")
  514. for future in futures.as_completed(futures_list):
  515. futures_list.remove(future)
  516. wr = weakref.ref(future)
  517. del future
  518. self.assertIsNone(wr())
  519. if futures_list:
  520. futures_list[0].set_result("test")
  521. def test_correct_timeout_exception_msg(self):
  522. futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
  523. RUNNING_FUTURE, SUCCESSFUL_FUTURE]
  524. with self.assertRaises(futures.TimeoutError) as cm:
  525. list(futures.as_completed(futures_list, timeout=0))
  526. self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
  527. create_executor_tests(AsCompletedTests)
  528. class ExecutorTest:
  529. # Executor.shutdown() and context manager usage is tested by
  530. # ExecutorShutdownTest.
  531. def test_submit(self):
  532. future = self.executor.submit(pow, 2, 8)
  533. self.assertEqual(256, future.result())
  534. def test_submit_keyword(self):
  535. future = self.executor.submit(mul, 2, y=8)
  536. self.assertEqual(16, future.result())
  537. future = self.executor.submit(capture, 1, self=2, fn=3)
  538. self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
  539. with self.assertRaises(TypeError):
  540. self.executor.submit(fn=capture, arg=1)
  541. with self.assertRaises(TypeError):
  542. self.executor.submit(arg=1)
  543. def test_map(self):
  544. self.assertEqual(
  545. list(self.executor.map(pow, range(10), range(10))),
  546. list(map(pow, range(10), range(10))))
  547. self.assertEqual(
  548. list(self.executor.map(pow, range(10), range(10), chunksize=3)),
  549. list(map(pow, range(10), range(10))))
  550. def test_map_exception(self):
  551. i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
  552. self.assertEqual(i.__next__(), (0, 1))
  553. self.assertEqual(i.__next__(), (0, 1))
  554. self.assertRaises(ZeroDivisionError, i.__next__)
  555. def test_map_timeout(self):
  556. results = []
  557. try:
  558. for i in self.executor.map(time.sleep,
  559. [0, 0, 6],
  560. timeout=5):
  561. results.append(i)
  562. except futures.TimeoutError:
  563. pass
  564. else:
  565. self.fail('expected TimeoutError')
  566. self.assertEqual([None, None], results)
  567. def test_shutdown_race_issue12456(self):
  568. # Issue #12456: race condition at shutdown where trying to post a
  569. # sentinel in the call queue blocks (the queue is full while processes
  570. # have exited).
  571. self.executor.map(str, [2] * (self.worker_count + 1))
  572. self.executor.shutdown()
  573. @support.cpython_only
  574. def test_no_stale_references(self):
  575. # Issue #16284: check that the executors don't unnecessarily hang onto
  576. # references.
  577. my_object = MyObject()
  578. my_object_collected = threading.Event()
  579. my_object_callback = weakref.ref(
  580. my_object, lambda obj: my_object_collected.set())
  581. # Deliberately discarding the future.
  582. self.executor.submit(my_object.my_method)
  583. del my_object
  584. collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
  585. self.assertTrue(collected,
  586. "Stale reference not collected within timeout.")
  587. def test_max_workers_negative(self):
  588. for number in (0, -1):
  589. with self.assertRaisesRegex(ValueError,
  590. "max_workers must be greater "
  591. "than 0"):
  592. self.executor_type(max_workers=number)
  593. def test_free_reference(self):
  594. # Issue #14406: Result iterator should not keep an internal
  595. # reference to result objects.
  596. for obj in self.executor.map(make_dummy_object, range(10)):
  597. wr = weakref.ref(obj)
  598. del obj
  599. self.assertIsNone(wr())
  600. class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
  601. def test_map_submits_without_iteration(self):
  602. """Tests verifying issue 11777."""
  603. finished = []
  604. def record_finished(n):
  605. finished.append(n)
  606. self.executor.map(record_finished, range(10))
  607. self.executor.shutdown(wait=True)
  608. self.assertCountEqual(finished, range(10))
  609. def test_default_workers(self):
  610. executor = self.executor_type()
  611. expected = min(32, (os.cpu_count() or 1) + 4)
  612. self.assertEqual(executor._max_workers, expected)
  613. def test_saturation(self):
  614. executor = self.executor_type(4)
  615. def acquire_lock(lock):
  616. lock.acquire()
  617. sem = threading.Semaphore(0)
  618. for i in range(15 * executor._max_workers):
  619. executor.submit(acquire_lock, sem)
  620. self.assertEqual(len(executor._threads), executor._max_workers)
  621. for i in range(15 * executor._max_workers):
  622. sem.release()
  623. executor.shutdown(wait=True)
  624. def test_idle_thread_reuse(self):
  625. executor = self.executor_type()
  626. executor.submit(mul, 21, 2).result()
  627. executor.submit(mul, 6, 7).result()
  628. executor.submit(mul, 3, 14).result()
  629. self.assertEqual(len(executor._threads), 1)
  630. executor.shutdown(wait=True)
  631. class ProcessPoolExecutorTest(ExecutorTest):
  632. @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
  633. def test_max_workers_too_large(self):
  634. with self.assertRaisesRegex(ValueError,
  635. "max_workers must be <= 61"):
  636. futures.ProcessPoolExecutor(max_workers=62)
  637. def test_killed_child(self):
  638. # When a child process is abruptly terminated, the whole pool gets
  639. # "broken".
  640. futures = [self.executor.submit(time.sleep, 3)]
  641. # Get one of the processes, and terminate (kill) it
  642. p = next(iter(self.executor._processes.values()))
  643. p.terminate()
  644. for fut in futures:
  645. self.assertRaises(BrokenProcessPool, fut.result)
  646. # Submitting other jobs fails as well.
  647. self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
  648. def test_map_chunksize(self):
  649. def bad_map():
  650. list(self.executor.map(pow, range(40), range(40), chunksize=-1))
  651. ref = list(map(pow, range(40), range(40)))
  652. self.assertEqual(
  653. list(self.executor.map(pow, range(40), range(40), chunksize=6)),
  654. ref)
  655. self.assertEqual(
  656. list(self.executor.map(pow, range(40), range(40), chunksize=50)),
  657. ref)
  658. self.assertEqual(
  659. list(self.executor.map(pow, range(40), range(40), chunksize=40)),
  660. ref)
  661. self.assertRaises(ValueError, bad_map)
  662. @classmethod
  663. def _test_traceback(cls):
  664. raise RuntimeError(123) # some comment
  665. def test_traceback(self):
  666. # We want ensure that the traceback from the child process is
  667. # contained in the traceback raised in the main process.
  668. future = self.executor.submit(self._test_traceback)
  669. with self.assertRaises(Exception) as cm:
  670. future.result()
  671. exc = cm.exception
  672. self.assertIs(type(exc), RuntimeError)
  673. self.assertEqual(exc.args, (123,))
  674. cause = exc.__cause__
  675. self.assertIs(type(cause), futures.process._RemoteTraceback)
  676. self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
  677. with support.captured_stderr() as f1:
  678. try:
  679. raise exc
  680. except RuntimeError:
  681. sys.excepthook(*sys.exc_info())
  682. self.assertIn('raise RuntimeError(123) # some comment',
  683. f1.getvalue())
  684. def test_ressources_gced_in_workers(self):
  685. # Ensure that argument for a job are correctly gc-ed after the job
  686. # is finished
  687. obj = EventfulGCObj(self.ctx)
  688. future = self.executor.submit(id, obj)
  689. future.result()
  690. self.assertTrue(obj.event.wait(timeout=1))
  691. create_executor_tests(ProcessPoolExecutorTest,
  692. executor_mixins=(ProcessPoolForkMixin,
  693. ProcessPoolForkserverMixin,
  694. ProcessPoolSpawnMixin))
  695. def hide_process_stderr():
  696. import io
  697. sys.stderr = io.StringIO()
  698. def _crash(delay=None):
  699. """Induces a segfault."""
  700. if delay:
  701. time.sleep(delay)
  702. import faulthandler
  703. faulthandler.disable()
  704. faulthandler._sigsegv()
  705. def _exit():
  706. """Induces a sys exit with exitcode 1."""
  707. sys.exit(1)
  708. def _raise_error(Err):
  709. """Function that raises an Exception in process."""
  710. hide_process_stderr()
  711. raise Err()
  712. def _return_instance(cls):
  713. """Function that returns a instance of cls."""
  714. hide_process_stderr()
  715. return cls()
  716. class CrashAtPickle(object):
  717. """Bad object that triggers a segfault at pickling time."""
  718. def __reduce__(self):
  719. _crash()
  720. class CrashAtUnpickle(object):
  721. """Bad object that triggers a segfault at unpickling time."""
  722. def __reduce__(self):
  723. return _crash, ()
  724. class ExitAtPickle(object):
  725. """Bad object that triggers a process exit at pickling time."""
  726. def __reduce__(self):
  727. _exit()
  728. class ExitAtUnpickle(object):
  729. """Bad object that triggers a process exit at unpickling time."""
  730. def __reduce__(self):
  731. return _exit, ()
  732. class ErrorAtPickle(object):
  733. """Bad object that triggers an error at pickling time."""
  734. def __reduce__(self):
  735. from pickle import PicklingError
  736. raise PicklingError("Error in pickle")
  737. class ErrorAtUnpickle(object):
  738. """Bad object that triggers an error at unpickling time."""
  739. def __reduce__(self):
  740. from pickle import UnpicklingError
  741. return _raise_error, (UnpicklingError, )
  742. class ExecutorDeadlockTest:
  743. TIMEOUT = support.SHORT_TIMEOUT
  744. @classmethod
  745. def _sleep_id(cls, x, delay):
  746. time.sleep(delay)
  747. return x
  748. def _fail_on_deadlock(self, executor):
  749. # If we did not recover before TIMEOUT seconds, consider that the
  750. # executor is in a deadlock state and forcefully clean all its
  751. # composants.
  752. import faulthandler
  753. from tempfile import TemporaryFile
  754. with TemporaryFile(mode="w+") as f:
  755. faulthandler.dump_traceback(file=f)
  756. f.seek(0)
  757. tb = f.read()
  758. for p in executor._processes.values():
  759. p.terminate()
  760. # This should be safe to call executor.shutdown here as all possible
  761. # deadlocks should have been broken.
  762. executor.shutdown(wait=True)
  763. print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
  764. self.fail(f"Executor deadlock:\n\n{tb}")
  765. def test_crash(self):
  766. # extensive testing for deadlock caused by crashes in a pool.
  767. self.executor.shutdown(wait=True)
  768. crash_cases = [
  769. # Check problem occurring while pickling a task in
  770. # the task_handler thread
  771. (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"),
  772. # Check problem occurring while unpickling a task on workers
  773. (id, (ExitAtUnpickle(),), BrokenProcessPool,
  774. "exit at task unpickle"),
  775. (id, (ErrorAtUnpickle(),), BrokenProcessPool,
  776. "error at task unpickle"),
  777. (id, (CrashAtUnpickle(),), BrokenProcessPool,
  778. "crash at task unpickle"),
  779. # Check problem occurring during func execution on workers
  780. (_crash, (), BrokenProcessPool,
  781. "crash during func execution on worker"),
  782. (_exit, (), SystemExit,
  783. "exit during func execution on worker"),
  784. (_raise_error, (RuntimeError, ), RuntimeError,
  785. "error during func execution on worker"),
  786. # Check problem occurring while pickling a task result
  787. # on workers
  788. (_return_instance, (CrashAtPickle,), BrokenProcessPool,
  789. "crash during result pickle on worker"),
  790. (_return_instance, (ExitAtPickle,), SystemExit,
  791. "exit during result pickle on worker"),
  792. (_return_instance, (ErrorAtPickle,), PicklingError,
  793. "error during result pickle on worker"),
  794. # Check problem occurring while unpickling a task in
  795. # the result_handler thread
  796. (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
  797. "error during result unpickle in result_handler"),
  798. (_return_instance, (ExitAtUnpickle,), BrokenProcessPool,
  799. "exit during result unpickle in result_handler")
  800. ]
  801. for func, args, error, name in crash_cases:
  802. with self.subTest(name):
  803. # The captured_stderr reduces the noise in the test report
  804. with support.captured_stderr():
  805. executor = self.executor_type(
  806. max_workers=2, mp_context=get_context(self.ctx))
  807. res = executor.submit(func, *args)
  808. with self.assertRaises(error):
  809. try:
  810. res.result(timeout=self.TIMEOUT)
  811. except futures.TimeoutError:
  812. # If we did not recover before TIMEOUT seconds,
  813. # consider that the executor is in a deadlock state
  814. self._fail_on_deadlock(executor)
  815. executor.shutdown(wait=True)
  816. def test_shutdown_deadlock(self):
  817. # Test that the pool calling shutdown do not cause deadlock
  818. # if a worker fails after the shutdown call.
  819. self.executor.shutdown(wait=True)
  820. with self.executor_type(max_workers=2,
  821. mp_context=get_context(self.ctx)) as executor:
  822. self.executor = executor # Allow clean up in fail_on_deadlock
  823. f = executor.submit(_crash, delay=.1)
  824. executor.shutdown(wait=True)
  825. with self.assertRaises(BrokenProcessPool):
  826. f.result()
  827. create_executor_tests(ExecutorDeadlockTest,
  828. executor_mixins=(ProcessPoolForkMixin,
  829. ProcessPoolForkserverMixin,
  830. ProcessPoolSpawnMixin))
  831. class FutureTests(BaseTestCase):
  832. def test_done_callback_with_result(self):
  833. callback_result = None
  834. def fn(callback_future):
  835. nonlocal callback_result
  836. callback_result = callback_future.result()
  837. f = Future()
  838. f.add_done_callback(fn)
  839. f.set_result(5)
  840. self.assertEqual(5, callback_result)
  841. def test_done_callback_with_exception(self):
  842. callback_exception = None
  843. def fn(callback_future):
  844. nonlocal callback_exception
  845. callback_exception = callback_future.exception()
  846. f = Future()
  847. f.add_done_callback(fn)
  848. f.set_exception(Exception('test'))
  849. self.assertEqual(('test',), callback_exception.args)
  850. def test_done_callback_with_cancel(self):
  851. was_cancelled = None
  852. def fn(callback_future):
  853. nonlocal was_cancelled
  854. was_cancelled = callback_future.cancelled()
  855. f = Future()
  856. f.add_done_callback(fn)
  857. self.assertTrue(f.cancel())
  858. self.assertTrue(was_cancelled)
  859. def test_done_callback_raises(self):
  860. with support.captured_stderr() as stderr:
  861. raising_was_called = False
  862. fn_was_called = False
  863. def raising_fn(callback_future):
  864. nonlocal raising_was_called
  865. raising_was_called = True
  866. raise Exception('doh!')
  867. def fn(callback_future):
  868. nonlocal fn_was_called
  869. fn_was_called = True
  870. f = Future()
  871. f.add_done_callback(raising_fn)
  872. f.add_done_callback(fn)
  873. f.set_result(5)
  874. self.assertTrue(raising_was_called)
  875. self.assertTrue(fn_was_called)
  876. self.assertIn('Exception: doh!', stderr.getvalue())
  877. def test_done_callback_already_successful(self):
  878. callback_result = None
  879. def fn(callback_future):
  880. nonlocal callback_result
  881. callback_result = callback_future.result()
  882. f = Future()
  883. f.set_result(5)
  884. f.add_done_callback(fn)
  885. self.assertEqual(5, callback_result)
  886. def test_done_callback_already_failed(self):
  887. callback_exception = None
  888. def fn(callback_future):
  889. nonlocal callback_exception
  890. callback_exception = callback_future.exception()
  891. f = Future()
  892. f.set_exception(Exception('test'))
  893. f.add_done_callback(fn)
  894. self.assertEqual(('test',), callback_exception.args)
  895. def test_done_callback_already_cancelled(self):
  896. was_cancelled = None
  897. def fn(callback_future):
  898. nonlocal was_cancelled
  899. was_cancelled = callback_future.cancelled()
  900. f = Future()
  901. self.assertTrue(f.cancel())
  902. f.add_done_callback(fn)
  903. self.assertTrue(was_cancelled)
  904. def test_done_callback_raises_already_succeeded(self):
  905. with support.captured_stderr() as stderr:
  906. def raising_fn(callback_future):
  907. raise Exception('doh!')
  908. f = Future()
  909. # Set the result first to simulate a future that runs instantly,
  910. # effectively allowing the callback to be run immediately.
  911. f.set_result(5)
  912. f.add_done_callback(raising_fn)
  913. self.assertIn('exception calling callback for', stderr.getvalue())
  914. self.assertIn('doh!', stderr.getvalue())
  915. def test_repr(self):
  916. self.assertRegex(repr(PENDING_FUTURE),
  917. '<Future at 0x[0-9a-f]+ state=pending>')
  918. self.assertRegex(repr(RUNNING_FUTURE),
  919. '<Future at 0x[0-9a-f]+ state=running>')
  920. self.assertRegex(repr(CANCELLED_FUTURE),
  921. '<Future at 0x[0-9a-f]+ state=cancelled>')
  922. self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
  923. '<Future at 0x[0-9a-f]+ state=cancelled>')
  924. self.assertRegex(
  925. repr(EXCEPTION_FUTURE),
  926. '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
  927. self.assertRegex(
  928. repr(SUCCESSFUL_FUTURE),
  929. '<Future at 0x[0-9a-f]+ state=finished returned int>')
  930. def test_cancel(self):
  931. f1 = create_future(state=PENDING)
  932. f2 = create_future(state=RUNNING)
  933. f3 = create_future(state=CANCELLED)
  934. f4 = create_future(state=CANCELLED_AND_NOTIFIED)
  935. f5 = create_future(state=FINISHED, exception=OSError())
  936. f6 = create_future(state=FINISHED, result=5)
  937. self.assertTrue(f1.cancel())
  938. self.assertEqual(f1._state, CANCELLED)
  939. self.assertFalse(f2.cancel())
  940. self.assertEqual(f2._state, RUNNING)
  941. self.assertTrue(f3.cancel())
  942. self.assertEqual(f3._state, CANCELLED)
  943. self.assertTrue(f4.cancel())
  944. self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
  945. self.assertFalse(f5.cancel())
  946. self.assertEqual(f5._state, FINISHED)
  947. self.assertFalse(f6.cancel())
  948. self.assertEqual(f6._state, FINISHED)
  949. def test_cancelled(self):
  950. self.assertFalse(PENDING_FUTURE.cancelled())
  951. self.assertFalse(RUNNING_FUTURE.cancelled())
  952. self.assertTrue(CANCELLED_FUTURE.cancelled())
  953. self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
  954. self.assertFalse(EXCEPTION_FUTURE.cancelled())
  955. self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
  956. def test_done(self):
  957. self.assertFalse(PENDING_FUTURE.done())
  958. self.assertFalse(RUNNING_FUTURE.done())
  959. self.assertTrue(CANCELLED_FUTURE.done())
  960. self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
  961. self.assertTrue(EXCEPTION_FUTURE.done())
  962. self.assertTrue(SUCCESSFUL_FUTURE.done())
  963. def test_running(self):
  964. self.assertFalse(PENDING_FUTURE.running())
  965. self.assertTrue(RUNNING_FUTURE.running())
  966. self.assertFalse(CANCELLED_FUTURE.running())
  967. self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
  968. self.assertFalse(EXCEPTION_FUTURE.running())
  969. self.assertFalse(SUCCESSFUL_FUTURE.running())
  970. def test_result_with_timeout(self):
  971. self.assertRaises(futures.TimeoutError,
  972. PENDING_FUTURE.result, timeout=0)
  973. self.assertRaises(futures.TimeoutError,
  974. RUNNING_FUTURE.result, timeout=0)
  975. self.assertRaises(futures.CancelledError,
  976. CANCELLED_FUTURE.result, timeout=0)
  977. self.assertRaises(futures.CancelledError,
  978. CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
  979. self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
  980. self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
  981. def test_result_with_success(self):
  982. # TODO(brian@sweetapp.com): This test is timing dependent.
  983. def notification():
  984. # Wait until the main thread is waiting for the result.
  985. time.sleep(1)
  986. f1.set_result(42)
  987. f1 = create_future(state=PENDING)
  988. t = threading.Thread(target=notification)
  989. t.start()
  990. self.assertEqual(f1.result(timeout=5), 42)
  991. t.join()
  992. def test_result_with_cancel(self):
  993. # TODO(brian@sweetapp.com): This test is timing dependent.
  994. def notification():
  995. # Wait until the main thread is waiting for the result.
  996. time.sleep(1)
  997. f1.cancel()
  998. f1 = create_future(state=PENDING)
  999. t = threading.Thread(target=notification)
  1000. t.start()
  1001. self.assertRaises(futures.CancelledError,
  1002. f1.result, timeout=support.SHORT_TIMEOUT)
  1003. t.join()
  1004. def test_exception_with_timeout(self):
  1005. self.assertRaises(futures.TimeoutError,
  1006. PENDING_FUTURE.exception, timeout=0)
  1007. self.assertRaises(futures.TimeoutError,
  1008. RUNNING_FUTURE.exception, timeout=0)
  1009. self.assertRaises(futures.CancelledError,
  1010. CANCELLED_FUTURE.exception, timeout=0)
  1011. self.assertRaises(futures.CancelledError,
  1012. CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
  1013. self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
  1014. OSError))
  1015. self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
  1016. def test_exception_with_success(self):
  1017. def notification():
  1018. # Wait until the main thread is waiting for the exception.
  1019. time.sleep(1)
  1020. with f1._condition:
  1021. f1._state = FINISHED
  1022. f1._exception = OSError()
  1023. f1._condition.notify_all()
  1024. f1 = create_future(state=PENDING)
  1025. t = threading.Thread(target=notification)
  1026. t.start()
  1027. self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
  1028. t.join()
  1029. def test_multiple_set_result(self):
  1030. f = create_future(state=PENDING)
  1031. f.set_result(1)
  1032. with self.assertRaisesRegex(
  1033. futures.InvalidStateError,
  1034. 'FINISHED: <Future at 0x[0-9a-f]+ '
  1035. 'state=finished returned int>'
  1036. ):
  1037. f.set_result(2)
  1038. self.assertTrue(f.done())
  1039. self.assertEqual(f.result(), 1)
  1040. def test_multiple_set_exception(self):
  1041. f = create_future(state=PENDING)
  1042. e = ValueError()
  1043. f.set_exception(e)
  1044. with self.assertRaisesRegex(
  1045. futures.InvalidStateError,
  1046. 'FINISHED: <Future at 0x[0-9a-f]+ '
  1047. 'state=finished raised ValueError>'
  1048. ):
  1049. f.set_exception(Exception())
  1050. self.assertEqual(f.exception(), e)
  1051. _threads_key = None
  1052. def setUpModule():
  1053. global _threads_key
  1054. _threads_key = support.threading_setup()
  1055. def tearDownModule():
  1056. support.threading_cleanup(*_threads_key)
  1057. support.reap_children()
  1058. # cleanup multiprocessing
  1059. multiprocessing.process._cleanup()
  1060. # Stop the ForkServer process if it's running
  1061. from multiprocessing import forkserver
  1062. forkserver._forkserver._stop()
  1063. # bpo-37421: Explicitly call _run_finalizers() to remove immediately
  1064. # temporary directories created by multiprocessing.util.get_temp_dir().
  1065. multiprocessing.util._run_finalizers()
  1066. support.gc_collect()
  1067. if __name__ == "__main__":
  1068. unittest.main()