|
|
|
@ -2101,20 +2101,16 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): |
|
|
|
|
|
|
|
|
|
|
|
class RunCoroutineThreadsafeTests(test_utils.TestCase): |
|
|
|
"""Test case for futures.submit_to_loop.""" |
|
|
|
"""Test case for asyncio.run_coroutine_threadsafe.""" |
|
|
|
|
|
|
|
def setUp(self): |
|
|
|
self.loop = self.new_test_loop(self.time_gen) |
|
|
|
|
|
|
|
def time_gen(self): |
|
|
|
"""Handle the timer.""" |
|
|
|
yield 0 # second |
|
|
|
yield 1 # second |
|
|
|
self.loop = asyncio.new_event_loop() |
|
|
|
self.set_event_loop(self.loop) # Will cleanup properly |
|
|
|
|
|
|
|
@asyncio.coroutine |
|
|
|
def add(self, a, b, fail=False, cancel=False): |
|
|
|
"""Wait 1 second and return a + b.""" |
|
|
|
yield from asyncio.sleep(1, loop=self.loop) |
|
|
|
"""Wait 0.05 second and return a + b.""" |
|
|
|
yield from asyncio.sleep(0.05, loop=self.loop) |
|
|
|
if fail: |
|
|
|
raise RuntimeError("Fail!") |
|
|
|
if cancel: |
|
|
|
@ -2122,10 +2118,20 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase): |
|
|
|
yield |
|
|
|
return a + b |
|
|
|
|
|
|
|
def target(self, fail=False, cancel=False, timeout=None): |
|
|
|
def target(self, fail=False, cancel=False, timeout=None, |
|
|
|
advance_coro=False): |
|
|
|
"""Run add coroutine in the event loop.""" |
|
|
|
coro = self.add(1, 2, fail=fail, cancel=cancel) |
|
|
|
future = asyncio.run_coroutine_threadsafe(coro, self.loop) |
|
|
|
if advance_coro: |
|
|
|
# this is for test_run_coroutine_threadsafe_task_factory_exception; |
|
|
|
# otherwise it spills errors and breaks **other** unittests, since |
|
|
|
# 'target' is interacting with threads. |
|
|
|
|
|
|
|
# With this call, `coro` will be advanced, so that |
|
|
|
# CoroWrapper.__del__ won't do anything when asyncio tests run |
|
|
|
# in debug mode. |
|
|
|
self.loop.call_soon_threadsafe(coro.send, None) |
|
|
|
try: |
|
|
|
return future.result(timeout) |
|
|
|
finally: |
|
|
|
@ -2152,7 +2158,6 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase): |
|
|
|
future = self.loop.run_in_executor(None, callback) |
|
|
|
with self.assertRaises(asyncio.TimeoutError): |
|
|
|
self.loop.run_until_complete(future) |
|
|
|
# Clear the time generator and tasks |
|
|
|
test_utils.run_briefly(self.loop) |
|
|
|
# Check that there's no pending task (add has been cancelled) |
|
|
|
for task in asyncio.Task.all_tasks(self.loop): |
|
|
|
@ -2169,10 +2174,9 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase): |
|
|
|
def test_run_coroutine_threadsafe_task_factory_exception(self): |
|
|
|
"""Test coroutine submission from a tread to an event loop |
|
|
|
when the task factory raise an exception.""" |
|
|
|
# Clear the time generator |
|
|
|
asyncio.ensure_future(self.add(1, 2), loop=self.loop) |
|
|
|
# Schedule the target |
|
|
|
future = self.loop.run_in_executor(None, self.target) |
|
|
|
future = self.loop.run_in_executor( |
|
|
|
None, lambda: self.target(advance_coro=True)) |
|
|
|
# Set corrupted task factory |
|
|
|
self.loop.set_task_factory(lambda loop, coro: wrong_name) |
|
|
|
# Set exception handler |
|
|
|
|