|
28 | 28 | Union,
|
29 | 29 | )
|
30 | 30 |
|
31 |
| -from pymongo import _csot, ssl_support |
| 31 | +from pymongo import ssl_support |
32 | 32 | from pymongo._asyncio_task import create_task
|
33 | 33 | from pymongo.errors import _OperationCancelled
|
34 | 34 | from pymongo.socket_checker import _errno_from_exception
|
@@ -316,62 +316,47 @@ async def _async_receive(conn: socket.socket, length: int, loop: AbstractEventLo
|
316 | 316 | return mv
|
317 | 317 |
|
318 | 318 |
|
319 |
| -# Sync version: |
320 |
| -def wait_for_read(conn: Connection, deadline: Optional[float]) -> None: |
321 |
| - """Block until at least one byte is read, or a timeout, or a cancel.""" |
322 |
| - sock = conn.conn |
323 |
| - timed_out = False |
324 |
| - # Check if the connection's socket has been manually closed |
325 |
| - if sock.fileno() == -1: |
326 |
| - return |
327 |
| - while True: |
328 |
| - # SSLSocket can have buffered data which won't be caught by select. |
329 |
| - if hasattr(sock, "pending") and sock.pending() > 0: |
330 |
| - readable = True |
331 |
| - else: |
332 |
| - # Wait up to 500ms for the socket to become readable and then |
333 |
| - # check for cancellation. |
334 |
| - if deadline: |
335 |
| - remaining = deadline - time.monotonic() |
336 |
| - # When the timeout has expired perform one final check to |
337 |
| - # see if the socket is readable. This helps avoid spurious |
338 |
| - # timeouts on AWS Lambda and other FaaS environments. |
339 |
| - if remaining <= 0: |
340 |
| - timed_out = True |
341 |
| - timeout = max(min(remaining, _POLL_TIMEOUT), 0) |
342 |
| - else: |
343 |
| - timeout = _POLL_TIMEOUT |
344 |
| - readable = conn.socket_checker.select(sock, read=True, timeout=timeout) |
345 |
| - if conn.cancel_context.cancelled: |
346 |
| - raise _OperationCancelled("operation cancelled") |
347 |
| - if readable: |
348 |
| - return |
349 |
| - if timed_out: |
350 |
| - raise socket.timeout("timed out") |
351 |
| - |
352 |
| - |
353 | 319 | def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> memoryview:
|
354 | 320 | buf = bytearray(length)
|
355 | 321 | mv = memoryview(buf)
|
356 | 322 | bytes_read = 0
|
357 |
| - while bytes_read < length: |
358 |
| - try: |
359 |
| - wait_for_read(conn, deadline) |
360 |
| - # CSOT: Update timeout. When the timeout has expired perform one |
361 |
| - # final non-blocking recv. This helps avoid spurious timeouts when |
362 |
| - # the response is actually already buffered on the client. |
363 |
| - if _csot.get_timeout() and deadline is not None: |
364 |
| - conn.set_conn_timeout(max(deadline - time.monotonic(), 0)) |
365 |
| - chunk_length = conn.conn.recv_into(mv[bytes_read:]) |
366 |
| - except BLOCKING_IO_ERRORS: |
367 |
| - raise socket.timeout("timed out") from None |
368 |
| - except OSError as exc: |
369 |
| - if _errno_from_exception(exc) == errno.EINTR: |
| 323 | + # To support cancelling a network read, we shorten the socket timeout and |
| 324 | + # check for the cancellation signal after each timeout. Alternatively we |
| 325 | + # could close the socket but that does not reliably cancel recv() calls |
| 326 | + # on all OSes. |
| 327 | + orig_timeout = conn.conn.gettimeout() |
| 328 | + try: |
| 329 | + while bytes_read < length: |
| 330 | + if deadline is not None: |
| 331 | + # CSOT: Update timeout. When the timeout has expired perform one |
| 332 | + # final non-blocking recv. This helps avoid spurious timeouts when |
| 333 | + # the response is actually already buffered on the client. |
| 334 | + short_timeout = min(max(deadline - time.monotonic(), 0), _POLL_TIMEOUT) |
| 335 | + else: |
| 336 | + short_timeout = _POLL_TIMEOUT |
| 337 | + conn.set_conn_timeout(short_timeout) |
| 338 | + try: |
| 339 | + chunk_length = conn.conn.recv_into(mv[bytes_read:]) |
| 340 | + except BLOCKING_IO_ERRORS: |
| 341 | + if conn.cancel_context.cancelled: |
| 342 | + raise _OperationCancelled("operation cancelled") from None |
| 343 | + # We reached the true deadline. |
| 344 | + raise socket.timeout("timed out") from None |
| 345 | + except socket.timeout: |
| 346 | + if conn.cancel_context.cancelled: |
| 347 | + raise _OperationCancelled("operation cancelled") from None |
370 | 348 | continue
|
371 |
| - raise |
372 |
| - if chunk_length == 0: |
373 |
| - raise OSError("connection closed") |
374 |
| - |
375 |
| - bytes_read += chunk_length |
| 349 | + except OSError as exc: |
| 350 | + if conn.cancel_context.cancelled: |
| 351 | + raise _OperationCancelled("operation cancelled") from None |
| 352 | + if _errno_from_exception(exc) == errno.EINTR: |
| 353 | + continue |
| 354 | + raise |
| 355 | + if chunk_length == 0: |
| 356 | + raise OSError("connection closed") |
| 357 | + |
| 358 | + bytes_read += chunk_length |
| 359 | + finally: |
| 360 | + conn.set_conn_timeout(orig_timeout) |
376 | 361 |
|
377 | 362 | return mv
|
0 commit comments