You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
3.8 KiB

  1. import multiprocessing
  2. import time
  3. import random
  4. import sys
  5. #
  6. # Functions used by test code
  7. #
  8. def calculate(func, args):
  9. result = func(*args)
  10. return '%s says that %s%s = %s' % (
  11. multiprocessing.current_process().name,
  12. func.__name__, args, result
  13. )
  14. def calculatestar(args):
  15. return calculate(*args)
  16. def mul(a, b):
  17. time.sleep(0.5 * random.random())
  18. return a * b
  19. def plus(a, b):
  20. time.sleep(0.5 * random.random())
  21. return a + b
  22. def f(x):
  23. return 1.0 / (x - 5.0)
  24. def pow3(x):
  25. return x ** 3
  26. def noop(x):
  27. pass
  28. #
  29. # Test code
  30. #
  31. def test():
  32. PROCESSES = 4
  33. print('Creating pool with %d processes\n' % PROCESSES)
  34. with multiprocessing.Pool(PROCESSES) as pool:
  35. #
  36. # Tests
  37. #
  38. TASKS = [(mul, (i, 7)) for i in range(10)] + \
  39. [(plus, (i, 8)) for i in range(10)]
  40. results = [pool.apply_async(calculate, t) for t in TASKS]
  41. imap_it = pool.imap(calculatestar, TASKS)
  42. imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
  43. print('Ordered results using pool.apply_async():')
  44. for r in results:
  45. print('\t', r.get())
  46. print()
  47. print('Ordered results using pool.imap():')
  48. for x in imap_it:
  49. print('\t', x)
  50. print()
  51. print('Unordered results using pool.imap_unordered():')
  52. for x in imap_unordered_it:
  53. print('\t', x)
  54. print()
  55. print('Ordered results using pool.map() --- will block till complete:')
  56. for x in pool.map(calculatestar, TASKS):
  57. print('\t', x)
  58. print()
  59. #
  60. # Test error handling
  61. #
  62. print('Testing error handling:')
  63. try:
  64. print(pool.apply(f, (5,)))
  65. except ZeroDivisionError:
  66. print('\tGot ZeroDivisionError as expected from pool.apply()')
  67. else:
  68. raise AssertionError('expected ZeroDivisionError')
  69. try:
  70. print(pool.map(f, list(range(10))))
  71. except ZeroDivisionError:
  72. print('\tGot ZeroDivisionError as expected from pool.map()')
  73. else:
  74. raise AssertionError('expected ZeroDivisionError')
  75. try:
  76. print(list(pool.imap(f, list(range(10)))))
  77. except ZeroDivisionError:
  78. print('\tGot ZeroDivisionError as expected from list(pool.imap())')
  79. else:
  80. raise AssertionError('expected ZeroDivisionError')
  81. it = pool.imap(f, list(range(10)))
  82. for i in range(10):
  83. try:
  84. x = next(it)
  85. except ZeroDivisionError:
  86. if i == 5:
  87. pass
  88. except StopIteration:
  89. break
  90. else:
  91. if i == 5:
  92. raise AssertionError('expected ZeroDivisionError')
  93. assert i == 9
  94. print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
  95. print()
  96. #
  97. # Testing timeouts
  98. #
  99. print('Testing ApplyResult.get() with timeout:', end=' ')
  100. res = pool.apply_async(calculate, TASKS[0])
  101. while 1:
  102. sys.stdout.flush()
  103. try:
  104. sys.stdout.write('\n\t%s' % res.get(0.02))
  105. break
  106. except multiprocessing.TimeoutError:
  107. sys.stdout.write('.')
  108. print()
  109. print()
  110. print('Testing IMapIterator.next() with timeout:', end=' ')
  111. it = pool.imap(calculatestar, TASKS)
  112. while 1:
  113. sys.stdout.flush()
  114. try:
  115. sys.stdout.write('\n\t%s' % it.next(0.02))
  116. except StopIteration:
  117. break
  118. except multiprocessing.TimeoutError:
  119. sys.stdout.write('.')
  120. print()
  121. print()
  122. if __name__ == '__main__':
  123. multiprocessing.freeze_support()
  124. test()