You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

360 lines
13 KiB

Merged revisions 80552-80556,80564-80566,80568-80571 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r80552 | victor.stinner | 2010-04-27 23:46:03 +0200 (mar., 27 avril 2010) | 3 lines Issue #7449, part 1: fix test_support.py for Python compiled without thread ........ r80553 | victor.stinner | 2010-04-27 23:47:01 +0200 (mar., 27 avril 2010) | 1 line Issue #7449, part 2: regrtest.py -j option requires thread support ........ r80554 | victor.stinner | 2010-04-27 23:51:26 +0200 (mar., 27 avril 2010) | 9 lines Issue #7449 part 3, test_doctest: import trace module in test_coverage() Import trace module fail if the threading module is missing. test_coverage() is only used if test_doctest.py is used with the -c option. This commit allows to execute the test suite without thread support. Move "import trace" in test_coverage() and use test_support.import_module('trace'). ........ r80555 | victor.stinner | 2010-04-27 23:56:26 +0200 (mar., 27 avril 2010) | 6 lines Issue #7449, part 4: skip test_multiprocessing if thread support is disabled import threading after _multiprocessing to raise a more revelant error message: "No module named _multiprocessing". _multiprocessing is not compiled without thread support. ........ r80556 | victor.stinner | 2010-04-28 00:01:24 +0200 (mer., 28 avril 2010) | 8 lines Issue #7449, part 5: split Test.test_open() of ctypes/test/test_errno.py * Split Test.test_open() in 2 functions: test_open() and test_thread_open() * Skip test_open() and test_thread_open() if we are unable to find the C library * Skip test_thread_open() if thread support is disabled * Use unittest.skipUnless(os.name == "nt", ...) on test_GetLastError() ........ r80564 | victor.stinner | 2010-04-28 00:59:35 +0200 (mer., 28 avril 2010) | 4 lines Issue #7449, part 6: fix test_hashlib for missing threading module Move @test_support.reap_thread decorator from test_main() to test_threaded_hashing(). ........ r80565 | victor.stinner | 2010-04-28 01:01:29 +0200 (mer., 28 avril 2010) | 6 lines Issue #7449, part 7: simplify threading detection in test_capi * Skip TestPendingCalls if threading module is missing * Test if threading module is present or not, instead of test the presence of _testcapi._test_thread_state ........ r80566 | victor.stinner | 2010-04-28 01:03:16 +0200 (mer., 28 avril 2010) | 4 lines Issue #7449, part 8: don't skip the whole test_asynchat if threading is missing TestFifo can be executed without the threading module ........ r80568 | victor.stinner | 2010-04-28 01:14:58 +0200 (mer., 28 avril 2010) | 6 lines Issue #7449, part 9: fix test_xmlrpclib for missing threading module * Skip testcases using threads if threading module is missing * Use "http://" instead of URL in ServerProxyTestCase if threading is missing because URL is not set in this case ........ r80569 | victor.stinner | 2010-04-28 01:33:58 +0200 (mer., 28 avril 2010) | 6 lines Partial revert of r80556 (Issue #7449, part 5, fix ctypes test) Rewrite r80556: the thread test have to be executed just after the test on libc_open() and so the test cannot be splitted in two functions (without duplicating code, and I don't want to duplicate code). ........ r80570 | victor.stinner | 2010-04-28 01:51:16 +0200 (mer., 28 avril 2010) | 8 lines Issue #7449, part 10: test_cmd imports trace module using test_support.import_module() Use test_support.import_module() instead of import to raise a SkipTest exception if the import fail. Import trace fails if the threading module is missing. See also part 3: test_doctest: import trace module in test_coverage(). ........ r80571 | victor.stinner | 2010-04-28 01:55:59 +0200 (mer., 28 avril 2010) | 6 lines Issue #7449, last part (11): fix many tests if thread support is disabled * Use try/except ImportError or test_support.import_module() to import thread and threading modules * Add @unittest.skipUnless(threading, ...) to testcases using threads ........
16 years ago
  1. # Some simple queue module tests, plus some failure conditions
  2. # to ensure the Queue locks remain stable.
  3. import queue
  4. import time
  5. import unittest
  6. from test import support
  7. threading = support.import_module('threading')
  8. QUEUE_SIZE = 5
  9. def qfull(q):
  10. return q.maxsize > 0 and q.qsize() == q.maxsize
  11. # A thread to run a function that unclogs a blocked Queue.
  12. class _TriggerThread(threading.Thread):
  13. def __init__(self, fn, args):
  14. self.fn = fn
  15. self.args = args
  16. self.startedEvent = threading.Event()
  17. threading.Thread.__init__(self)
  18. def run(self):
  19. # The sleep isn't necessary, but is intended to give the blocking
  20. # function in the main thread a chance at actually blocking before
  21. # we unclog it. But if the sleep is longer than the timeout-based
  22. # tests wait in their blocking functions, those tests will fail.
  23. # So we give them much longer timeout values compared to the
  24. # sleep here (I aimed at 10 seconds for blocking functions --
  25. # they should never actually wait that long - they should make
  26. # progress as soon as we call self.fn()).
  27. time.sleep(0.1)
  28. self.startedEvent.set()
  29. self.fn(*self.args)
  30. # Execute a function that blocks, and in a separate thread, a function that
  31. # triggers the release. Returns the result of the blocking function. Caution:
  32. # block_func must guarantee to block until trigger_func is called, and
  33. # trigger_func must guarantee to change queue state so that block_func can make
  34. # enough progress to return. In particular, a block_func that just raises an
  35. # exception regardless of whether trigger_func is called will lead to
  36. # timing-dependent sporadic failures, and one of those went rarely seen but
  37. # undiagnosed for years. Now block_func must be unexceptional. If block_func
  38. # is supposed to raise an exception, call do_exceptional_blocking_test()
  39. # instead.
  40. class BlockingTestMixin:
  41. def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
  42. self.t = _TriggerThread(trigger_func, trigger_args)
  43. self.t.start()
  44. self.result = block_func(*block_args)
  45. # If block_func returned before our thread made the call, we failed!
  46. if not self.t.startedEvent.is_set():
  47. self.fail("blocking function '%r' appeared not to block" %
  48. block_func)
  49. self.t.join(10) # make sure the thread terminates
  50. if self.t.is_alive():
  51. self.fail("trigger function '%r' appeared to not return" %
  52. trigger_func)
  53. return self.result
  54. # Call this instead if block_func is supposed to raise an exception.
  55. def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
  56. trigger_args, expected_exception_class):
  57. self.t = _TriggerThread(trigger_func, trigger_args)
  58. self.t.start()
  59. try:
  60. try:
  61. block_func(*block_args)
  62. except expected_exception_class:
  63. raise
  64. else:
  65. self.fail("expected exception of kind %r" %
  66. expected_exception_class)
  67. finally:
  68. self.t.join(10) # make sure the thread terminates
  69. if self.t.is_alive():
  70. self.fail("trigger function '%r' appeared to not return" %
  71. trigger_func)
  72. if not self.t.startedEvent.is_set():
  73. self.fail("trigger thread ended but event never set")
  74. class BaseQueueTestMixin(BlockingTestMixin):
  75. def setUp(self):
  76. self.cum = 0
  77. self.cumlock = threading.Lock()
  78. def simple_queue_test(self, q):
  79. if q.qsize():
  80. raise RuntimeError("Call this function with an empty queue")
  81. self.assertTrue(q.empty())
  82. self.assertFalse(q.full())
  83. # I guess we better check things actually queue correctly a little :)
  84. q.put(111)
  85. q.put(333)
  86. q.put(222)
  87. target_order = dict(Queue = [111, 333, 222],
  88. LifoQueue = [222, 333, 111],
  89. PriorityQueue = [111, 222, 333])
  90. actual_order = [q.get(), q.get(), q.get()]
  91. self.assertEqual(actual_order, target_order[q.__class__.__name__],
  92. "Didn't seem to queue the correct data!")
  93. for i in range(QUEUE_SIZE-1):
  94. q.put(i)
  95. self.assertTrue(q.qsize(), "Queue should not be empty")
  96. self.assertTrue(not qfull(q), "Queue should not be full")
  97. last = 2 * QUEUE_SIZE
  98. full = 3 * 2 * QUEUE_SIZE
  99. q.put(last)
  100. self.assertTrue(qfull(q), "Queue should be full")
  101. self.assertFalse(q.empty())
  102. self.assertTrue(q.full())
  103. try:
  104. q.put(full, block=0)
  105. self.fail("Didn't appear to block with a full queue")
  106. except queue.Full:
  107. pass
  108. try:
  109. q.put(full, timeout=0.01)
  110. self.fail("Didn't appear to time-out with a full queue")
  111. except queue.Full:
  112. pass
  113. # Test a blocking put
  114. self.do_blocking_test(q.put, (full,), q.get, ())
  115. self.do_blocking_test(q.put, (full, True, 10), q.get, ())
  116. # Empty it
  117. for i in range(QUEUE_SIZE):
  118. q.get()
  119. self.assertTrue(not q.qsize(), "Queue should be empty")
  120. try:
  121. q.get(block=0)
  122. self.fail("Didn't appear to block with an empty queue")
  123. except queue.Empty:
  124. pass
  125. try:
  126. q.get(timeout=0.01)
  127. self.fail("Didn't appear to time-out with an empty queue")
  128. except queue.Empty:
  129. pass
  130. # Test a blocking get
  131. self.do_blocking_test(q.get, (), q.put, ('empty',))
  132. self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
  133. def worker(self, q):
  134. while True:
  135. x = q.get()
  136. if x < 0:
  137. q.task_done()
  138. return
  139. with self.cumlock:
  140. self.cum += x
  141. q.task_done()
  142. def queue_join_test(self, q):
  143. self.cum = 0
  144. for i in (0,1):
  145. threading.Thread(target=self.worker, args=(q,)).start()
  146. for i in range(100):
  147. q.put(i)
  148. q.join()
  149. self.assertEqual(self.cum, sum(range(100)),
  150. "q.join() did not block until all tasks were done")
  151. for i in (0,1):
  152. q.put(-1) # instruct the threads to close
  153. q.join() # verify that you can join twice
  154. def test_queue_task_done(self):
  155. # Test to make sure a queue task completed successfully.
  156. q = self.type2test()
  157. try:
  158. q.task_done()
  159. except ValueError:
  160. pass
  161. else:
  162. self.fail("Did not detect task count going negative")
  163. def test_queue_join(self):
  164. # Test that a queue join()s successfully, and before anything else
  165. # (done twice for insurance).
  166. q = self.type2test()
  167. self.queue_join_test(q)
  168. self.queue_join_test(q)
  169. try:
  170. q.task_done()
  171. except ValueError:
  172. pass
  173. else:
  174. self.fail("Did not detect task count going negative")
  175. def test_simple_queue(self):
  176. # Do it a couple of times on the same queue.
  177. # Done twice to make sure works with same instance reused.
  178. q = self.type2test(QUEUE_SIZE)
  179. self.simple_queue_test(q)
  180. self.simple_queue_test(q)
  181. def test_negative_timeout_raises_exception(self):
  182. q = self.type2test(QUEUE_SIZE)
  183. with self.assertRaises(ValueError):
  184. q.put(1, timeout=-1)
  185. with self.assertRaises(ValueError):
  186. q.get(1, timeout=-1)
  187. def test_nowait(self):
  188. q = self.type2test(QUEUE_SIZE)
  189. for i in range(QUEUE_SIZE):
  190. q.put_nowait(1)
  191. with self.assertRaises(queue.Full):
  192. q.put_nowait(1)
  193. for i in range(QUEUE_SIZE):
  194. q.get_nowait()
  195. with self.assertRaises(queue.Empty):
  196. q.get_nowait()
  197. def test_shrinking_queue(self):
  198. # issue 10110
  199. q = self.type2test(3)
  200. q.put(1)
  201. q.put(2)
  202. q.put(3)
  203. with self.assertRaises(queue.Full):
  204. q.put_nowait(4)
  205. self.assertEqual(q.qsize(), 3)
  206. q.maxsize = 2 # shrink the queue
  207. with self.assertRaises(queue.Full):
  208. q.put_nowait(4)
  209. class QueueTest(BaseQueueTestMixin, unittest.TestCase):
  210. type2test = queue.Queue
  211. class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
  212. type2test = queue.LifoQueue
  213. class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
  214. type2test = queue.PriorityQueue
  215. # A Queue subclass that can provoke failure at a moment's notice :)
  216. class FailingQueueException(Exception):
  217. pass
  218. class FailingQueue(queue.Queue):
  219. def __init__(self, *args):
  220. self.fail_next_put = False
  221. self.fail_next_get = False
  222. queue.Queue.__init__(self, *args)
  223. def _put(self, item):
  224. if self.fail_next_put:
  225. self.fail_next_put = False
  226. raise FailingQueueException("You Lose")
  227. return queue.Queue._put(self, item)
  228. def _get(self):
  229. if self.fail_next_get:
  230. self.fail_next_get = False
  231. raise FailingQueueException("You Lose")
  232. return queue.Queue._get(self)
  233. class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
  234. def failing_queue_test(self, q):
  235. if q.qsize():
  236. raise RuntimeError("Call this function with an empty queue")
  237. for i in range(QUEUE_SIZE-1):
  238. q.put(i)
  239. # Test a failing non-blocking put.
  240. q.fail_next_put = True
  241. try:
  242. q.put("oops", block=0)
  243. self.fail("The queue didn't fail when it should have")
  244. except FailingQueueException:
  245. pass
  246. q.fail_next_put = True
  247. try:
  248. q.put("oops", timeout=0.1)
  249. self.fail("The queue didn't fail when it should have")
  250. except FailingQueueException:
  251. pass
  252. q.put("last")
  253. self.assertTrue(qfull(q), "Queue should be full")
  254. # Test a failing blocking put
  255. q.fail_next_put = True
  256. try:
  257. self.do_blocking_test(q.put, ("full",), q.get, ())
  258. self.fail("The queue didn't fail when it should have")
  259. except FailingQueueException:
  260. pass
  261. # Check the Queue isn't damaged.
  262. # put failed, but get succeeded - re-add
  263. q.put("last")
  264. # Test a failing timeout put
  265. q.fail_next_put = True
  266. try:
  267. self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
  268. FailingQueueException)
  269. self.fail("The queue didn't fail when it should have")
  270. except FailingQueueException:
  271. pass
  272. # Check the Queue isn't damaged.
  273. # put failed, but get succeeded - re-add
  274. q.put("last")
  275. self.assertTrue(qfull(q), "Queue should be full")
  276. q.get()
  277. self.assertTrue(not qfull(q), "Queue should not be full")
  278. q.put("last")
  279. self.assertTrue(qfull(q), "Queue should be full")
  280. # Test a blocking put
  281. self.do_blocking_test(q.put, ("full",), q.get, ())
  282. # Empty it
  283. for i in range(QUEUE_SIZE):
  284. q.get()
  285. self.assertTrue(not q.qsize(), "Queue should be empty")
  286. q.put("first")
  287. q.fail_next_get = True
  288. try:
  289. q.get()
  290. self.fail("The queue didn't fail when it should have")
  291. except FailingQueueException:
  292. pass
  293. self.assertTrue(q.qsize(), "Queue should not be empty")
  294. q.fail_next_get = True
  295. try:
  296. q.get(timeout=0.1)
  297. self.fail("The queue didn't fail when it should have")
  298. except FailingQueueException:
  299. pass
  300. self.assertTrue(q.qsize(), "Queue should not be empty")
  301. q.get()
  302. self.assertTrue(not q.qsize(), "Queue should be empty")
  303. q.fail_next_get = True
  304. try:
  305. self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
  306. FailingQueueException)
  307. self.fail("The queue didn't fail when it should have")
  308. except FailingQueueException:
  309. pass
  310. # put succeeded, but get failed.
  311. self.assertTrue(q.qsize(), "Queue should not be empty")
  312. q.get()
  313. self.assertTrue(not q.qsize(), "Queue should be empty")
  314. def test_failing_queue(self):
  315. # Test to make sure a queue is functioning correctly.
  316. # Done twice to the same instance.
  317. q = FailingQueue(QUEUE_SIZE)
  318. self.failing_queue_test(q)
  319. self.failing_queue_test(q)
  320. def test_main():
  321. support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,
  322. FailingQueueTest)
  323. if __name__ == "__main__":
  324. test_main()