You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

611 lines
18 KiB

  1. # This file should be kept compatible with both Python 2.6 and Python >= 3.0.
  2. from __future__ import division
  3. from __future__ import print_function
  4. """
  5. ccbench, a Python concurrency benchmark.
  6. """
  7. import time
  8. import os
  9. import sys
  10. import itertools
  11. import threading
  12. import subprocess
  13. import socket
  14. from optparse import OptionParser, SUPPRESS_HELP
  15. import platform
  16. # Compatibility
  17. try:
  18. xrange
  19. except NameError:
  20. xrange = range
  21. try:
  22. map = itertools.imap
  23. except AttributeError:
  24. pass
  25. THROUGHPUT_DURATION = 2.0
  26. LATENCY_PING_INTERVAL = 0.1
  27. LATENCY_DURATION = 2.0
  28. BANDWIDTH_PACKET_SIZE = 1024
  29. BANDWIDTH_DURATION = 2.0
  30. def task_pidigits():
  31. """Pi calculation (Python)"""
  32. _map = map
  33. _count = itertools.count
  34. _islice = itertools.islice
  35. def calc_ndigits(n):
  36. # From http://shootout.alioth.debian.org/
  37. def gen_x():
  38. return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
  39. def compose(a, b):
  40. aq, ar, as_, at = a
  41. bq, br, bs, bt = b
  42. return (aq * bq,
  43. aq * br + ar * bt,
  44. as_ * bq + at * bs,
  45. as_ * br + at * bt)
  46. def extract(z, j):
  47. q, r, s, t = z
  48. return (q*j + r) // (s*j + t)
  49. def pi_digits():
  50. z = (1, 0, 0, 1)
  51. x = gen_x()
  52. while 1:
  53. y = extract(z, 3)
  54. while y != extract(z, 4):
  55. z = compose(z, next(x))
  56. y = extract(z, 3)
  57. z = compose((10, -10*y, 0, 1), z)
  58. yield y
  59. return list(_islice(pi_digits(), n))
  60. return calc_ndigits, (50, )
  61. def task_regex():
  62. """regular expression (C)"""
  63. # XXX this task gives horrendous latency results.
  64. import re
  65. # Taken from the `inspect` module
  66. pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
  67. with open(__file__, "r") as f:
  68. arg = f.read(2000)
  69. def findall(s):
  70. t = time.time()
  71. try:
  72. return pat.findall(s)
  73. finally:
  74. print(time.time() - t)
  75. return pat.findall, (arg, )
  76. def task_sort():
  77. """list sorting (C)"""
  78. def list_sort(l):
  79. l = l[::-1]
  80. l.sort()
  81. return list_sort, (list(range(1000)), )
  82. def task_compress_zlib():
  83. """zlib compression (C)"""
  84. import zlib
  85. with open(__file__, "rb") as f:
  86. arg = f.read(5000) * 3
  87. def compress(s):
  88. zlib.decompress(zlib.compress(s, 5))
  89. return compress, (arg, )
  90. def task_compress_bz2():
  91. """bz2 compression (C)"""
  92. import bz2
  93. with open(__file__, "rb") as f:
  94. arg = f.read(3000) * 2
  95. def compress(s):
  96. bz2.compress(s)
  97. return compress, (arg, )
  98. def task_hashing():
  99. """SHA1 hashing (C)"""
  100. import hashlib
  101. with open(__file__, "rb") as f:
  102. arg = f.read(5000) * 30
  103. def compute(s):
  104. hashlib.sha1(s).digest()
  105. return compute, (arg, )
  106. throughput_tasks = [task_pidigits, task_regex]
  107. for mod in 'bz2', 'hashlib':
  108. try:
  109. globals()[mod] = __import__(mod)
  110. except ImportError:
  111. globals()[mod] = None
  112. # For whatever reasons, zlib gives irregular results, so we prefer bz2 or
  113. # hashlib if available.
  114. # (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
  115. if bz2 is not None:
  116. throughput_tasks.append(task_compress_bz2)
  117. elif hashlib is not None:
  118. throughput_tasks.append(task_hashing)
  119. else:
  120. throughput_tasks.append(task_compress_zlib)
  121. latency_tasks = throughput_tasks
  122. bandwidth_tasks = [task_pidigits]
  123. class TimedLoop:
  124. def __init__(self, func, args):
  125. self.func = func
  126. self.args = args
  127. def __call__(self, start_time, min_duration, end_event, do_yield=False):
  128. step = 20
  129. niters = 0
  130. duration = 0.0
  131. _time = time.time
  132. _sleep = time.sleep
  133. _func = self.func
  134. _args = self.args
  135. t1 = start_time
  136. while True:
  137. for i in range(step):
  138. _func(*_args)
  139. t2 = _time()
  140. # If another thread terminated, the current measurement is invalid
  141. # => return the previous one.
  142. if end_event:
  143. return niters, duration
  144. niters += step
  145. duration = t2 - start_time
  146. if duration >= min_duration:
  147. end_event.append(None)
  148. return niters, duration
  149. if t2 - t1 < 0.01:
  150. # Minimize interference of measurement on overall runtime
  151. step = step * 3 // 2
  152. elif do_yield:
  153. # OS scheduling of Python threads is sometimes so bad that we
  154. # have to force thread switching ourselves, otherwise we get
  155. # completely useless results.
  156. _sleep(0.0001)
  157. t1 = t2
  158. def run_throughput_test(func, args, nthreads):
  159. assert nthreads >= 1
  160. # Warm up
  161. func(*args)
  162. results = []
  163. loop = TimedLoop(func, args)
  164. end_event = []
  165. if nthreads == 1:
  166. # Pure single-threaded performance, without any switching or
  167. # synchronization overhead.
  168. start_time = time.time()
  169. results.append(loop(start_time, THROUGHPUT_DURATION,
  170. end_event, do_yield=False))
  171. return results
  172. started = False
  173. ready_cond = threading.Condition()
  174. start_cond = threading.Condition()
  175. ready = []
  176. def run():
  177. with ready_cond:
  178. ready.append(None)
  179. ready_cond.notify()
  180. with start_cond:
  181. while not started:
  182. start_cond.wait()
  183. results.append(loop(start_time, THROUGHPUT_DURATION,
  184. end_event, do_yield=True))
  185. threads = []
  186. for i in range(nthreads):
  187. threads.append(threading.Thread(target=run))
  188. for t in threads:
  189. t.setDaemon(True)
  190. t.start()
  191. # We don't want measurements to include thread startup overhead,
  192. # so we arrange for timing to start after all threads are ready.
  193. with ready_cond:
  194. while len(ready) < nthreads:
  195. ready_cond.wait()
  196. with start_cond:
  197. start_time = time.time()
  198. started = True
  199. start_cond.notify(nthreads)
  200. for t in threads:
  201. t.join()
  202. return results
  203. def run_throughput_tests(max_threads):
  204. for task in throughput_tasks:
  205. print(task.__doc__)
  206. print()
  207. func, args = task()
  208. nthreads = 1
  209. baseline_speed = None
  210. while nthreads <= max_threads:
  211. results = run_throughput_test(func, args, nthreads)
  212. # Taking the max duration rather than average gives pessimistic
  213. # results rather than optimistic.
  214. speed = sum(r[0] for r in results) / max(r[1] for r in results)
  215. print("threads=%d: %d" % (nthreads, speed), end="")
  216. if baseline_speed is None:
  217. print(" iterations/s.")
  218. baseline_speed = speed
  219. else:
  220. print(" ( %d %%)" % (speed / baseline_speed * 100))
  221. nthreads += 1
  222. print()
  223. LAT_END = "END"
  224. def _sendto(sock, s, addr):
  225. sock.sendto(s.encode('ascii'), addr)
  226. def _recv(sock, n):
  227. return sock.recv(n).decode('ascii')
  228. def latency_client(addr, nb_pings, interval):
  229. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  230. try:
  231. _time = time.time
  232. _sleep = time.sleep
  233. def _ping():
  234. _sendto(sock, "%r\n" % _time(), addr)
  235. # The first ping signals the parent process that we are ready.
  236. _ping()
  237. # We give the parent a bit of time to notice.
  238. _sleep(1.0)
  239. for i in range(nb_pings):
  240. _sleep(interval)
  241. _ping()
  242. _sendto(sock, LAT_END + "\n", addr)
  243. finally:
  244. sock.close()
  245. def run_latency_client(**kwargs):
  246. cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
  247. cmd_line.extend(['--latclient', repr(kwargs)])
  248. return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
  249. #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
  250. def run_latency_test(func, args, nthreads):
  251. # Create a listening socket to receive the pings. We use UDP which should
  252. # be painlessly cross-platform.
  253. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  254. sock.bind(("127.0.0.1", 0))
  255. addr = sock.getsockname()
  256. interval = LATENCY_PING_INTERVAL
  257. duration = LATENCY_DURATION
  258. nb_pings = int(duration / interval)
  259. results = []
  260. threads = []
  261. end_event = []
  262. start_cond = threading.Condition()
  263. started = False
  264. if nthreads > 0:
  265. # Warm up
  266. func(*args)
  267. results = []
  268. loop = TimedLoop(func, args)
  269. ready = []
  270. ready_cond = threading.Condition()
  271. def run():
  272. with ready_cond:
  273. ready.append(None)
  274. ready_cond.notify()
  275. with start_cond:
  276. while not started:
  277. start_cond.wait()
  278. loop(start_time, duration * 1.5, end_event, do_yield=False)
  279. for i in range(nthreads):
  280. threads.append(threading.Thread(target=run))
  281. for t in threads:
  282. t.setDaemon(True)
  283. t.start()
  284. # Wait for threads to be ready
  285. with ready_cond:
  286. while len(ready) < nthreads:
  287. ready_cond.wait()
  288. # Run the client and wait for the first ping(s) to arrive before
  289. # unblocking the background threads.
  290. chunks = []
  291. process = run_latency_client(addr=sock.getsockname(),
  292. nb_pings=nb_pings, interval=interval)
  293. s = _recv(sock, 4096)
  294. _time = time.time
  295. with start_cond:
  296. start_time = _time()
  297. started = True
  298. start_cond.notify(nthreads)
  299. while LAT_END not in s:
  300. s = _recv(sock, 4096)
  301. t = _time()
  302. chunks.append((t, s))
  303. # Tell the background threads to stop.
  304. end_event.append(None)
  305. for t in threads:
  306. t.join()
  307. process.wait()
  308. sock.close()
  309. for recv_time, chunk in chunks:
  310. # NOTE: it is assumed that a line sent by a client wasn't received
  311. # in two chunks because the lines are very small.
  312. for line in chunk.splitlines():
  313. line = line.strip()
  314. if line and line != LAT_END:
  315. send_time = eval(line)
  316. assert isinstance(send_time, float)
  317. results.append((send_time, recv_time))
  318. return results
  319. def run_latency_tests(max_threads):
  320. for task in latency_tasks:
  321. print("Background CPU task:", task.__doc__)
  322. print()
  323. func, args = task()
  324. nthreads = 0
  325. while nthreads <= max_threads:
  326. results = run_latency_test(func, args, nthreads)
  327. n = len(results)
  328. # We print out milliseconds
  329. lats = [1000 * (t2 - t1) for (t1, t2) in results]
  330. #print(list(map(int, lats)))
  331. avg = sum(lats) / n
  332. dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
  333. print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
  334. print()
  335. #print(" [... from %d samples]" % n)
  336. nthreads += 1
  337. print()
  338. BW_END = "END"
  339. def bandwidth_client(addr, packet_size, duration):
  340. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  341. sock.bind(("127.0.0.1", 0))
  342. local_addr = sock.getsockname()
  343. _time = time.time
  344. _sleep = time.sleep
  345. def _send_chunk(msg):
  346. _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
  347. # We give the parent some time to be ready.
  348. _sleep(1.0)
  349. try:
  350. start_time = _time()
  351. end_time = start_time + duration * 2.0
  352. i = 0
  353. while _time() < end_time:
  354. _send_chunk(str(i))
  355. s = _recv(sock, packet_size)
  356. assert len(s) == packet_size
  357. i += 1
  358. _send_chunk(BW_END)
  359. finally:
  360. sock.close()
  361. def run_bandwidth_client(**kwargs):
  362. cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
  363. cmd_line.extend(['--bwclient', repr(kwargs)])
  364. return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
  365. #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
  366. def run_bandwidth_test(func, args, nthreads):
  367. # Create a listening socket to receive the packets. We use UDP which should
  368. # be painlessly cross-platform.
  369. with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
  370. sock.bind(("127.0.0.1", 0))
  371. addr = sock.getsockname()
  372. duration = BANDWIDTH_DURATION
  373. packet_size = BANDWIDTH_PACKET_SIZE
  374. results = []
  375. threads = []
  376. end_event = []
  377. start_cond = threading.Condition()
  378. started = False
  379. if nthreads > 0:
  380. # Warm up
  381. func(*args)
  382. results = []
  383. loop = TimedLoop(func, args)
  384. ready = []
  385. ready_cond = threading.Condition()
  386. def run():
  387. with ready_cond:
  388. ready.append(None)
  389. ready_cond.notify()
  390. with start_cond:
  391. while not started:
  392. start_cond.wait()
  393. loop(start_time, duration * 1.5, end_event, do_yield=False)
  394. for i in range(nthreads):
  395. threads.append(threading.Thread(target=run))
  396. for t in threads:
  397. t.setDaemon(True)
  398. t.start()
  399. # Wait for threads to be ready
  400. with ready_cond:
  401. while len(ready) < nthreads:
  402. ready_cond.wait()
  403. # Run the client and wait for the first packet to arrive before
  404. # unblocking the background threads.
  405. process = run_bandwidth_client(addr=addr,
  406. packet_size=packet_size,
  407. duration=duration)
  408. _time = time.time
  409. # This will also wait for the parent to be ready
  410. s = _recv(sock, packet_size)
  411. remote_addr = eval(s.partition('#')[0])
  412. with start_cond:
  413. start_time = _time()
  414. started = True
  415. start_cond.notify(nthreads)
  416. n = 0
  417. first_time = None
  418. while not end_event and BW_END not in s:
  419. _sendto(sock, s, remote_addr)
  420. s = _recv(sock, packet_size)
  421. if first_time is None:
  422. first_time = _time()
  423. n += 1
  424. end_time = _time()
  425. end_event.append(None)
  426. for t in threads:
  427. t.join()
  428. process.kill()
  429. return (n - 1) / (end_time - first_time)
  430. def run_bandwidth_tests(max_threads):
  431. for task in bandwidth_tasks:
  432. print("Background CPU task:", task.__doc__)
  433. print()
  434. func, args = task()
  435. nthreads = 0
  436. baseline_speed = None
  437. while nthreads <= max_threads:
  438. results = run_bandwidth_test(func, args, nthreads)
  439. speed = results
  440. #speed = len(results) * 1.0 / results[-1][0]
  441. print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
  442. if baseline_speed is None:
  443. print(" packets/s.")
  444. baseline_speed = speed
  445. else:
  446. print(" ( %d %%)" % (speed / baseline_speed * 100))
  447. nthreads += 1
  448. print()
  449. def main():
  450. usage = "usage: %prog [-h|--help] [options]"
  451. parser = OptionParser(usage=usage)
  452. parser.add_option("-t", "--throughput",
  453. action="store_true", dest="throughput", default=False,
  454. help="run throughput tests")
  455. parser.add_option("-l", "--latency",
  456. action="store_true", dest="latency", default=False,
  457. help="run latency tests")
  458. parser.add_option("-b", "--bandwidth",
  459. action="store_true", dest="bandwidth", default=False,
  460. help="run I/O bandwidth tests")
  461. parser.add_option("-i", "--interval",
  462. action="store", type="int", dest="check_interval", default=None,
  463. help="sys.setcheckinterval() value")
  464. parser.add_option("-I", "--switch-interval",
  465. action="store", type="float", dest="switch_interval", default=None,
  466. help="sys.setswitchinterval() value")
  467. parser.add_option("-n", "--num-threads",
  468. action="store", type="int", dest="nthreads", default=4,
  469. help="max number of threads in tests")
  470. # Hidden option to run the pinging and bandwidth clients
  471. parser.add_option("", "--latclient",
  472. action="store", dest="latclient", default=None,
  473. help=SUPPRESS_HELP)
  474. parser.add_option("", "--bwclient",
  475. action="store", dest="bwclient", default=None,
  476. help=SUPPRESS_HELP)
  477. options, args = parser.parse_args()
  478. if args:
  479. parser.error("unexpected arguments")
  480. if options.latclient:
  481. kwargs = eval(options.latclient)
  482. latency_client(**kwargs)
  483. return
  484. if options.bwclient:
  485. kwargs = eval(options.bwclient)
  486. bandwidth_client(**kwargs)
  487. return
  488. if not options.throughput and not options.latency and not options.bandwidth:
  489. options.throughput = options.latency = options.bandwidth = True
  490. if options.check_interval:
  491. sys.setcheckinterval(options.check_interval)
  492. if options.switch_interval:
  493. sys.setswitchinterval(options.switch_interval)
  494. print("== %s %s (%s) ==" % (
  495. platform.python_implementation(),
  496. platform.python_version(),
  497. platform.python_build()[0],
  498. ))
  499. # Processor identification often has repeated spaces
  500. cpu = ' '.join(platform.processor().split())
  501. print("== %s %s on '%s' ==" % (
  502. platform.machine(),
  503. platform.system(),
  504. cpu,
  505. ))
  506. print()
  507. if options.throughput:
  508. print("--- Throughput ---")
  509. print()
  510. run_throughput_tests(options.nthreads)
  511. if options.latency:
  512. print("--- Latency ---")
  513. print()
  514. run_latency_tests(options.nthreads)
  515. if options.bandwidth:
  516. print("--- I/O bandwidth ---")
  517. print()
  518. run_bandwidth_tests(options.nthreads)
  519. if __name__ == "__main__":
  520. main()