|
|
|
@ -11,6 +11,7 @@ import errno |
|
|
|
import functools |
|
|
|
import socket |
|
|
|
import warnings |
|
|
|
import weakref |
|
|
|
try: |
|
|
|
import ssl |
|
|
|
except ImportError: # pragma: no cover |
|
|
|
@ -64,6 +65,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): |
|
|
|
logger.debug('Using selector: %s', selector.__class__.__name__) |
|
|
|
self._selector = selector |
|
|
|
self._make_self_pipe() |
|
|
|
self._transports = weakref.WeakValueDictionary() |
|
|
|
|
|
|
|
def _make_socket_transport(self, sock, protocol, waiter=None, *, |
|
|
|
extra=None, server=None): |
|
|
|
@ -115,7 +117,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): |
|
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
def _close_self_pipe(self): |
|
|
|
self.remove_reader(self._ssock.fileno()) |
|
|
|
self._remove_reader(self._ssock.fileno()) |
|
|
|
self._ssock.close() |
|
|
|
self._ssock = None |
|
|
|
self._csock.close() |
|
|
|
@ -128,7 +130,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): |
|
|
|
self._ssock.setblocking(False) |
|
|
|
self._csock.setblocking(False) |
|
|
|
self._internal_fds += 1 |
|
|
|
self.add_reader(self._ssock.fileno(), self._read_from_self) |
|
|
|
self._add_reader(self._ssock.fileno(), self._read_from_self) |
|
|
|
|
|
|
|
def _process_self_data(self, data): |
|
|
|
pass |
|
|
|
@ -163,8 +165,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): |
|
|
|
|
|
|
|
def _start_serving(self, protocol_factory, sock, |
|
|
|
sslcontext=None, server=None, backlog=100): |
|
|
|
self.add_reader(sock.fileno(), self._accept_connection, |
|
|
|
protocol_factory, sock, sslcontext, server, backlog) |
|
|
|
self._add_reader(sock.fileno(), self._accept_connection, |
|
|
|
protocol_factory, sock, sslcontext, server, backlog) |
|
|
|
|
|
|
|
def _accept_connection(self, protocol_factory, sock, |
|
|
|
sslcontext=None, server=None, backlog=100): |
|
|
|
@ -194,7 +196,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): |
|
|
|
'exception': exc, |
|
|
|
'socket': sock, |
|
|
|
}) |
|
|
|
self.remove_reader(sock.fileno()) |
|
|
|
self._remove_reader(sock.fileno()) |
|
|
|
self.call_later(constants.ACCEPT_RETRY_DELAY, |
|
|
|
self._start_serving, |
|
|
|
protocol_factory, sock, sslcontext, server, |
|
|
|
@ -244,8 +246,18 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): |
|
|
|
context['transport'] = transport |
|
|
|
self.call_exception_handler(context) |
|
|
|
|
|
|
|
def add_reader(self, fd, callback, *args): |
|
|
|
"""Add a reader callback.""" |
|
|
|
def _ensure_fd_no_transport(self, fd): |
|
|
|
try: |
|
|
|
transport = self._transports[fd] |
|
|
|
except KeyError: |
|
|
|
pass |
|
|
|
else: |
|
|
|
if not transport.is_closing(): |
|
|
|
raise RuntimeError( |
|
|
|
'File descriptor {!r} is used by transport {!r}'.format( |
|
|
|
fd, transport)) |
|
|
|
|
|
|
|
def _add_reader(self, fd, callback, *args): |
|
|
|
self._check_closed() |
|
|
|
handle = events.Handle(callback, args, self) |
|
|
|
try: |
|
|
|
@ -260,8 +272,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): |
|
|
|
if reader is not None: |
|
|
|
reader.cancel() |
|
|
|
|
|
|
|
def remove_reader(self, fd): |
|
|
|
"""Remove a reader callback.""" |
|
|
|
def _remove_reader(self, fd): |
|
|
|
if self.is_closed(): |
|
|
|
return False |
|
|
|
try: |
|
|
|
@ -282,8 +293,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): |
|
|
|
else: |
|
|
|
return False |
|
|
|
|
|
|
|
def add_writer(self, fd, callback, *args): |
|
|
|
"""Add a writer callback..""" |
|
|
|
def _add_writer(self, fd, callback, *args): |
|
|
|
self._check_closed() |
|
|
|
handle = events.Handle(callback, args, self) |
|
|
|
try: |
|
|
|
@ -298,7 +308,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): |
|
|
|
if writer is not None: |
|
|
|
writer.cancel() |
|
|
|
|
|
|
|
def remove_writer(self, fd): |
|
|
|
def _remove_writer(self, fd): |
|
|
|
"""Remove a writer callback.""" |
|
|
|
if self.is_closed(): |
|
|
|
return False |
|
|
|
@ -321,6 +331,26 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): |
|
|
|
else: |
|
|
|
return False |
|
|
|
|
|
|
|
def add_reader(self, fd, callback, *args): |
|
|
|
"""Add a reader callback.""" |
|
|
|
self._ensure_fd_no_transport(fd) |
|
|
|
return self._add_reader(fd, callback, *args) |
|
|
|
|
|
|
|
def remove_reader(self, fd): |
|
|
|
"""Remove a reader callback.""" |
|
|
|
self._ensure_fd_no_transport(fd) |
|
|
|
return self._remove_reader(fd) |
|
|
|
|
|
|
|
def add_writer(self, fd, callback, *args): |
|
|
|
"""Add a writer callback..""" |
|
|
|
self._ensure_fd_no_transport(fd) |
|
|
|
return self._add_writer(fd, callback, *args) |
|
|
|
|
|
|
|
def remove_writer(self, fd): |
|
|
|
"""Remove a writer callback.""" |
|
|
|
self._ensure_fd_no_transport(fd) |
|
|
|
return self._remove_writer(fd) |
|
|
|
|
|
|
|
def sock_recv(self, sock, n): |
|
|
|
"""Receive data from the socket. |
|
|
|
|
|
|
|
@ -494,17 +524,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): |
|
|
|
fileobj, (reader, writer) = key.fileobj, key.data |
|
|
|
if mask & selectors.EVENT_READ and reader is not None: |
|
|
|
if reader._cancelled: |
|
|
|
self.remove_reader(fileobj) |
|
|
|
self._remove_reader(fileobj) |
|
|
|
else: |
|
|
|
self._add_callback(reader) |
|
|
|
if mask & selectors.EVENT_WRITE and writer is not None: |
|
|
|
if writer._cancelled: |
|
|
|
self.remove_writer(fileobj) |
|
|
|
self._remove_writer(fileobj) |
|
|
|
else: |
|
|
|
self._add_callback(writer) |
|
|
|
|
|
|
|
def _stop_serving(self, sock): |
|
|
|
self.remove_reader(sock.fileno()) |
|
|
|
self._remove_reader(sock.fileno()) |
|
|
|
sock.close() |
|
|
|
|
|
|
|
|
|
|
|
@ -539,6 +569,7 @@ class _SelectorTransport(transports._FlowControlMixin, |
|
|
|
self._closing = False # Set when close() called. |
|
|
|
if self._server is not None: |
|
|
|
self._server._attach() |
|
|
|
loop._transports[self._sock_fd] = self |
|
|
|
|
|
|
|
def __repr__(self): |
|
|
|
info = [self.__class__.__name__] |
|
|
|
@ -584,10 +615,10 @@ class _SelectorTransport(transports._FlowControlMixin, |
|
|
|
if self._closing: |
|
|
|
return |
|
|
|
self._closing = True |
|
|
|
self._loop.remove_reader(self._sock_fd) |
|
|
|
self._loop._remove_reader(self._sock_fd) |
|
|
|
if not self._buffer: |
|
|
|
self._conn_lost += 1 |
|
|
|
self._loop.remove_writer(self._sock_fd) |
|
|
|
self._loop._remove_writer(self._sock_fd) |
|
|
|
self._loop.call_soon(self._call_connection_lost, None) |
|
|
|
|
|
|
|
# On Python 3.3 and older, objects with a destructor part of a reference |
|
|
|
@ -618,10 +649,10 @@ class _SelectorTransport(transports._FlowControlMixin, |
|
|
|
return |
|
|
|
if self._buffer: |
|
|
|
self._buffer.clear() |
|
|
|
self._loop.remove_writer(self._sock_fd) |
|
|
|
self._loop._remove_writer(self._sock_fd) |
|
|
|
if not self._closing: |
|
|
|
self._closing = True |
|
|
|
self._loop.remove_reader(self._sock_fd) |
|
|
|
self._loop._remove_reader(self._sock_fd) |
|
|
|
self._conn_lost += 1 |
|
|
|
self._loop.call_soon(self._call_connection_lost, exc) |
|
|
|
|
|
|
|
@ -658,7 +689,7 @@ class _SelectorSocketTransport(_SelectorTransport): |
|
|
|
|
|
|
|
self._loop.call_soon(self._protocol.connection_made, self) |
|
|
|
# only start reading when connection_made() has been called |
|
|
|
self._loop.call_soon(self._loop.add_reader, |
|
|
|
self._loop.call_soon(self._loop._add_reader, |
|
|
|
self._sock_fd, self._read_ready) |
|
|
|
if waiter is not None: |
|
|
|
# only wake up the waiter when connection_made() has been called |
|
|
|
@ -671,7 +702,7 @@ class _SelectorSocketTransport(_SelectorTransport): |
|
|
|
if self._paused: |
|
|
|
raise RuntimeError('Already paused') |
|
|
|
self._paused = True |
|
|
|
self._loop.remove_reader(self._sock_fd) |
|
|
|
self._loop._remove_reader(self._sock_fd) |
|
|
|
if self._loop.get_debug(): |
|
|
|
logger.debug("%r pauses reading", self) |
|
|
|
|
|
|
|
@ -681,7 +712,7 @@ class _SelectorSocketTransport(_SelectorTransport): |
|
|
|
self._paused = False |
|
|
|
if self._closing: |
|
|
|
return |
|
|
|
self._loop.add_reader(self._sock_fd, self._read_ready) |
|
|
|
self._loop._add_reader(self._sock_fd, self._read_ready) |
|
|
|
if self._loop.get_debug(): |
|
|
|
logger.debug("%r resumes reading", self) |
|
|
|
|
|
|
|
@ -705,7 +736,7 @@ class _SelectorSocketTransport(_SelectorTransport): |
|
|
|
# We're keeping the connection open so the |
|
|
|
# protocol can write more, but we still can't |
|
|
|
# receive more, so remove the reader callback. |
|
|
|
self._loop.remove_reader(self._sock_fd) |
|
|
|
self._loop._remove_reader(self._sock_fd) |
|
|
|
else: |
|
|
|
self.close() |
|
|
|
|
|
|
|
@ -738,7 +769,7 @@ class _SelectorSocketTransport(_SelectorTransport): |
|
|
|
if not data: |
|
|
|
return |
|
|
|
# Not all was written; register write handler. |
|
|
|
self._loop.add_writer(self._sock_fd, self._write_ready) |
|
|
|
self._loop._add_writer(self._sock_fd, self._write_ready) |
|
|
|
|
|
|
|
# Add it to the buffer. |
|
|
|
self._buffer.extend(data) |
|
|
|
@ -754,7 +785,7 @@ class _SelectorSocketTransport(_SelectorTransport): |
|
|
|
except (BlockingIOError, InterruptedError): |
|
|
|
pass |
|
|
|
except Exception as exc: |
|
|
|
self._loop.remove_writer(self._sock_fd) |
|
|
|
self._loop._remove_writer(self._sock_fd) |
|
|
|
self._buffer.clear() |
|
|
|
self._fatal_error(exc, 'Fatal write error on socket transport') |
|
|
|
else: |
|
|
|
@ -762,7 +793,7 @@ class _SelectorSocketTransport(_SelectorTransport): |
|
|
|
del self._buffer[:n] |
|
|
|
self._maybe_resume_protocol() # May append to buffer. |
|
|
|
if not self._buffer: |
|
|
|
self._loop.remove_writer(self._sock_fd) |
|
|
|
self._loop._remove_writer(self._sock_fd) |
|
|
|
if self._closing: |
|
|
|
self._call_connection_lost(None) |
|
|
|
elif self._eof: |
|
|
|
@ -833,19 +864,19 @@ class _SelectorSslTransport(_SelectorTransport): |
|
|
|
try: |
|
|
|
self._sock.do_handshake() |
|
|
|
except ssl.SSLWantReadError: |
|
|
|
self._loop.add_reader(self._sock_fd, |
|
|
|
self._on_handshake, start_time) |
|
|
|
self._loop._add_reader(self._sock_fd, |
|
|
|
self._on_handshake, start_time) |
|
|
|
return |
|
|
|
except ssl.SSLWantWriteError: |
|
|
|
self._loop.add_writer(self._sock_fd, |
|
|
|
self._on_handshake, start_time) |
|
|
|
self._loop._add_writer(self._sock_fd, |
|
|
|
self._on_handshake, start_time) |
|
|
|
return |
|
|
|
except BaseException as exc: |
|
|
|
if self._loop.get_debug(): |
|
|
|
logger.warning("%r: SSL handshake failed", |
|
|
|
self, exc_info=True) |
|
|
|
self._loop.remove_reader(self._sock_fd) |
|
|
|
self._loop.remove_writer(self._sock_fd) |
|
|
|
self._loop._remove_reader(self._sock_fd) |
|
|
|
self._loop._remove_writer(self._sock_fd) |
|
|
|
self._sock.close() |
|
|
|
self._wakeup_waiter(exc) |
|
|
|
if isinstance(exc, Exception): |
|
|
|
@ -853,8 +884,8 @@ class _SelectorSslTransport(_SelectorTransport): |
|
|
|
else: |
|
|
|
raise |
|
|
|
|
|
|
|
self._loop.remove_reader(self._sock_fd) |
|
|
|
self._loop.remove_writer(self._sock_fd) |
|
|
|
self._loop._remove_reader(self._sock_fd) |
|
|
|
self._loop._remove_writer(self._sock_fd) |
|
|
|
|
|
|
|
peercert = self._sock.getpeercert() |
|
|
|
if not hasattr(self._sslcontext, 'check_hostname'): |
|
|
|
@ -882,7 +913,7 @@ class _SelectorSslTransport(_SelectorTransport): |
|
|
|
|
|
|
|
self._read_wants_write = False |
|
|
|
self._write_wants_read = False |
|
|
|
self._loop.add_reader(self._sock_fd, self._read_ready) |
|
|
|
self._loop._add_reader(self._sock_fd, self._read_ready) |
|
|
|
self._protocol_connected = True |
|
|
|
self._loop.call_soon(self._protocol.connection_made, self) |
|
|
|
# only wake up the waiter when connection_made() has been called |
|
|
|
@ -904,7 +935,7 @@ class _SelectorSslTransport(_SelectorTransport): |
|
|
|
if self._paused: |
|
|
|
raise RuntimeError('Already paused') |
|
|
|
self._paused = True |
|
|
|
self._loop.remove_reader(self._sock_fd) |
|
|
|
self._loop._remove_reader(self._sock_fd) |
|
|
|
if self._loop.get_debug(): |
|
|
|
logger.debug("%r pauses reading", self) |
|
|
|
|
|
|
|
@ -914,7 +945,7 @@ class _SelectorSslTransport(_SelectorTransport): |
|
|
|
self._paused = False |
|
|
|
if self._closing: |
|
|
|
return |
|
|
|
self._loop.add_reader(self._sock_fd, self._read_ready) |
|
|
|
self._loop._add_reader(self._sock_fd, self._read_ready) |
|
|
|
if self._loop.get_debug(): |
|
|
|
logger.debug("%r resumes reading", self) |
|
|
|
|
|
|
|
@ -926,7 +957,7 @@ class _SelectorSslTransport(_SelectorTransport): |
|
|
|
self._write_ready() |
|
|
|
|
|
|
|
if self._buffer: |
|
|
|
self._loop.add_writer(self._sock_fd, self._write_ready) |
|
|
|
self._loop._add_writer(self._sock_fd, self._write_ready) |
|
|
|
|
|
|
|
try: |
|
|
|
data = self._sock.recv(self.max_size) |
|
|
|
@ -934,8 +965,8 @@ class _SelectorSslTransport(_SelectorTransport): |
|
|
|
pass |
|
|
|
except ssl.SSLWantWriteError: |
|
|
|
self._read_wants_write = True |
|
|
|
self._loop.remove_reader(self._sock_fd) |
|
|
|
self._loop.add_writer(self._sock_fd, self._write_ready) |
|
|
|
self._loop._remove_reader(self._sock_fd) |
|
|
|
self._loop._add_writer(self._sock_fd, self._write_ready) |
|
|
|
except Exception as exc: |
|
|
|
self._fatal_error(exc, 'Fatal read error on SSL transport') |
|
|
|
else: |
|
|
|
@ -960,7 +991,7 @@ class _SelectorSslTransport(_SelectorTransport): |
|
|
|
self._read_ready() |
|
|
|
|
|
|
|
if not (self._paused or self._closing): |
|
|
|
self._loop.add_reader(self._sock_fd, self._read_ready) |
|
|
|
self._loop._add_reader(self._sock_fd, self._read_ready) |
|
|
|
|
|
|
|
if self._buffer: |
|
|
|
try: |
|
|
|
@ -969,10 +1000,10 @@ class _SelectorSslTransport(_SelectorTransport): |
|
|
|
n = 0 |
|
|
|
except ssl.SSLWantReadError: |
|
|
|
n = 0 |
|
|
|
self._loop.remove_writer(self._sock_fd) |
|
|
|
self._loop._remove_writer(self._sock_fd) |
|
|
|
self._write_wants_read = True |
|
|
|
except Exception as exc: |
|
|
|
self._loop.remove_writer(self._sock_fd) |
|
|
|
self._loop._remove_writer(self._sock_fd) |
|
|
|
self._buffer.clear() |
|
|
|
self._fatal_error(exc, 'Fatal write error on SSL transport') |
|
|
|
return |
|
|
|
@ -983,7 +1014,7 @@ class _SelectorSslTransport(_SelectorTransport): |
|
|
|
self._maybe_resume_protocol() # May append to buffer. |
|
|
|
|
|
|
|
if not self._buffer: |
|
|
|
self._loop.remove_writer(self._sock_fd) |
|
|
|
self._loop._remove_writer(self._sock_fd) |
|
|
|
if self._closing: |
|
|
|
self._call_connection_lost(None) |
|
|
|
|
|
|
|
@ -1001,7 +1032,7 @@ class _SelectorSslTransport(_SelectorTransport): |
|
|
|
return |
|
|
|
|
|
|
|
if not self._buffer: |
|
|
|
self._loop.add_writer(self._sock_fd, self._write_ready) |
|
|
|
self._loop._add_writer(self._sock_fd, self._write_ready) |
|
|
|
|
|
|
|
# Add it to the buffer. |
|
|
|
self._buffer.extend(data) |
|
|
|
@ -1021,7 +1052,7 @@ class _SelectorDatagramTransport(_SelectorTransport): |
|
|
|
self._address = address |
|
|
|
self._loop.call_soon(self._protocol.connection_made, self) |
|
|
|
# only start reading when connection_made() has been called |
|
|
|
self._loop.call_soon(self._loop.add_reader, |
|
|
|
self._loop.call_soon(self._loop._add_reader, |
|
|
|
self._sock_fd, self._read_ready) |
|
|
|
if waiter is not None: |
|
|
|
# only wake up the waiter when connection_made() has been called |
|
|
|
@ -1071,7 +1102,7 @@ class _SelectorDatagramTransport(_SelectorTransport): |
|
|
|
self._sock.sendto(data, addr) |
|
|
|
return |
|
|
|
except (BlockingIOError, InterruptedError): |
|
|
|
self._loop.add_writer(self._sock_fd, self._sendto_ready) |
|
|
|
self._loop._add_writer(self._sock_fd, self._sendto_ready) |
|
|
|
except OSError as exc: |
|
|
|
self._protocol.error_received(exc) |
|
|
|
return |
|
|
|
@ -1105,6 +1136,6 @@ class _SelectorDatagramTransport(_SelectorTransport): |
|
|
|
|
|
|
|
self._maybe_resume_protocol() # May append to buffer. |
|
|
|
if not self._buffer: |
|
|
|
self._loop.remove_writer(self._sock_fd) |
|
|
|
self._loop._remove_writer(self._sock_fd) |
|
|
|
if self._closing: |
|
|
|
self._call_connection_lost(None) |