You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

158 lines
5.2 KiB

  1. #
  2. # We use a background thread for sharing fds on Unix, and for sharing sockets on
  3. # Windows.
  4. #
  5. # A client which wants to pickle a resource registers it with the resource
  6. # sharer and gets an identifier in return. The unpickling process will connect
  7. # to the resource sharer, sends the identifier and its pid, and then receives
  8. # the resource.
  9. #
  10. import os
  11. import signal
  12. import socket
  13. import sys
  14. import threading
  15. from . import process
  16. from .context import reduction
  17. from . import util
  18. __all__ = ['stop']
  19. if sys.platform == 'win32':
  20. __all__ += ['DupSocket']
  21. class DupSocket(object):
  22. '''Picklable wrapper for a socket.'''
  23. def __init__(self, sock):
  24. new_sock = sock.dup()
  25. def send(conn, pid):
  26. share = new_sock.share(pid)
  27. conn.send_bytes(share)
  28. self._id = _resource_sharer.register(send, new_sock.close)
  29. def detach(self):
  30. '''Get the socket. This should only be called once.'''
  31. with _resource_sharer.get_connection(self._id) as conn:
  32. share = conn.recv_bytes()
  33. return socket.fromshare(share)
  34. else:
  35. __all__ += ['DupFd']
  36. class DupFd(object):
  37. '''Wrapper for fd which can be used at any time.'''
  38. def __init__(self, fd):
  39. new_fd = os.dup(fd)
  40. def send(conn, pid):
  41. reduction.send_handle(conn, new_fd, pid)
  42. def close():
  43. os.close(new_fd)
  44. self._id = _resource_sharer.register(send, close)
  45. def detach(self):
  46. '''Get the fd. This should only be called once.'''
  47. with _resource_sharer.get_connection(self._id) as conn:
  48. return reduction.recv_handle(conn)
  49. class _ResourceSharer(object):
  50. '''Manager for resources using background thread.'''
  51. def __init__(self):
  52. self._key = 0
  53. self._cache = {}
  54. self._old_locks = []
  55. self._lock = threading.Lock()
  56. self._listener = None
  57. self._address = None
  58. self._thread = None
  59. util.register_after_fork(self, _ResourceSharer._afterfork)
  60. def register(self, send, close):
  61. '''Register resource, returning an identifier.'''
  62. with self._lock:
  63. if self._address is None:
  64. self._start()
  65. self._key += 1
  66. self._cache[self._key] = (send, close)
  67. return (self._address, self._key)
  68. @staticmethod
  69. def get_connection(ident):
  70. '''Return connection from which to receive identified resource.'''
  71. from .connection import Client
  72. address, key = ident
  73. c = Client(address, authkey=process.current_process().authkey)
  74. c.send((key, os.getpid()))
  75. return c
  76. def stop(self, timeout=None):
  77. '''Stop the background thread and clear registered resources.'''
  78. from .connection import Client
  79. with self._lock:
  80. if self._address is not None:
  81. c = Client(self._address,
  82. authkey=process.current_process().authkey)
  83. c.send(None)
  84. c.close()
  85. self._thread.join(timeout)
  86. if self._thread.is_alive():
  87. util.sub_warning('_ResourceSharer thread did '
  88. 'not stop when asked')
  89. self._listener.close()
  90. self._thread = None
  91. self._address = None
  92. self._listener = None
  93. for key, (send, close) in self._cache.items():
  94. close()
  95. self._cache.clear()
  96. def _afterfork(self):
  97. for key, (send, close) in self._cache.items():
  98. close()
  99. self._cache.clear()
  100. # If self._lock was locked at the time of the fork, it may be broken
  101. # -- see issue 6721. Replace it without letting it be gc'ed.
  102. self._old_locks.append(self._lock)
  103. self._lock = threading.Lock()
  104. if self._listener is not None:
  105. self._listener.close()
  106. self._listener = None
  107. self._address = None
  108. self._thread = None
  109. def _start(self):
  110. from .connection import Listener
  111. assert self._listener is None, "Already have Listener"
  112. util.debug('starting listener and thread for sending handles')
  113. self._listener = Listener(authkey=process.current_process().authkey)
  114. self._address = self._listener.address
  115. t = threading.Thread(target=self._serve)
  116. t.daemon = True
  117. t.start()
  118. self._thread = t
  119. def _serve(self):
  120. if hasattr(signal, 'pthread_sigmask'):
  121. signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
  122. while 1:
  123. try:
  124. with self._listener.accept() as conn:
  125. msg = conn.recv()
  126. if msg is None:
  127. break
  128. key, destination_pid = msg
  129. send, close = self._cache.pop(key)
  130. try:
  131. send(conn, destination_pid)
  132. finally:
  133. close()
  134. except:
  135. if not util.is_exiting():
  136. sys.excepthook(*sys.exc_info())
  137. _resource_sharer = _ResourceSharer()
  138. stop = _resource_sharer.stop