diff --git a/requirements.txt b/requirements.txt index e42c1fb7f..f6308044b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,9 +7,9 @@ lxml==4.7.1 pygments==2.11.0 python-dateutil==2.8.2 pyyaml==6.0 -httpx[http2]==0.19.0 +httpx[http2]==0.21.2 Brotli==1.0.9 uvloop==0.16.0 -httpx-socks[asyncio]==0.4.1 +httpx-socks[asyncio]==0.7.2 langdetect==1.0.9 setproctitle==1.2.2 diff --git a/searx/network/client.py b/searx/network/client.py index cd1e41460..6858ac05b 100644 --- a/searx/network/client.py +++ b/searx/network/client.py @@ -6,8 +6,6 @@ import asyncio import logging import threading -import anyio -import httpcore import httpx from httpx_socks import AsyncProxyTransport from python_socks import parse_proxy_url, ProxyConnectionError, ProxyTimeoutError, ProxyError @@ -27,31 +25,10 @@ logger = logger.getChild('searx.network.client') LOOP = None SSLCONTEXTS = {} TRANSPORT_KWARGS = { - # use anyio : - # * https://github.com/encode/httpcore/issues/344 - # * https://github.com/encode/httpx/discussions/1511 - 'backend': 'anyio', 'trust_env': False, } -# pylint: disable=protected-access -async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL): - - origin = httpcore._utils.url_to_origin(url) - logger.debug('Drop connections for %r', origin) - connections_to_close = connection_pool._connections_for_origin(origin) - for connection in connections_to_close: - await connection_pool._remove_from_pool(connection) - try: - await connection.aclose() - except httpx.NetworkError as e: - logger.warning('Error closing an existing connection', exc_info=e) - - -# pylint: enable=protected-access - - def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False): key = (proxy_url, cert, verify, trust_env, http2) if key not in SSLCONTEXTS: @@ -62,75 +39,25 @@ def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport): """Block HTTP request""" - async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None): + async def handle_async_request(self, request): raise httpx.UnsupportedProtocol('HTTP protocol is disabled') class AsyncProxyTransportFixed(AsyncProxyTransport): """Fix httpx_socks.AsyncProxyTransport - Map python_socks exceptions to httpx.ProxyError / httpx.ConnectError - - Map socket.gaierror to httpx.ConnectError - - Note: AsyncProxyTransport inherit from AsyncConnectionPool + Map python_socks exceptions to httpx.ProxyError exceptions """ - async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None): - retry = 2 - while retry > 0: - retry -= 1 - try: - return await super().handle_async_request( - method, url, headers=headers, stream=stream, extensions=extensions - ) - except (ProxyConnectionError, ProxyTimeoutError, ProxyError) as e: - raise httpx.ProxyError from e - except OSError as e: - # socket.gaierror when DNS resolution fails - raise httpx.ConnectError from e - except httpx.NetworkError as e: - # httpx.WriteError on HTTP/2 connection leaves a new opened stream - # then each new request creates a new stream and raise the same WriteError - await close_connections_for_url(self, url) - raise e - except anyio.ClosedResourceError as e: - await close_connections_for_url(self, url) - raise httpx.CloseError from e - except httpx.RemoteProtocolError as e: - # in case of httpx.RemoteProtocolError: Server disconnected - await close_connections_for_url(self, url) - logger.warning('httpx.RemoteProtocolError: retry', exc_info=e) - # retry - - -class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport): - """Fix httpx.AsyncHTTPTransport""" - - async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None): - retry = 2 - while retry > 0: - retry -= 1 - try: - return await super().handle_async_request( - method, url, headers=headers, stream=stream, extensions=extensions - ) - except OSError as e: - # socket.gaierror when DNS resolution fails - raise httpx.ConnectError from e - except httpx.NetworkError as e: - # httpx.WriteError on HTTP/2 connection leaves a new opened stream - # then each new request creates a new stream and raise the same WriteError - await close_connections_for_url(self._pool, url) - raise e - except anyio.ClosedResourceError as e: - await close_connections_for_url(self._pool, url) - raise httpx.CloseError from e - except httpx.RemoteProtocolError as e: - # in case of httpx.RemoteProtocolError: Server disconnected - await close_connections_for_url(self._pool, url) - logger.warning('httpx.RemoteProtocolError: retry', exc_info=e) - # retry + async def handle_async_request(self, request): + try: + return await super().handle_async_request(request) + except ProxyConnectionError as e: + raise httpx.ProxyError("ProxyConnectionError: " + e.strerror, request=request) from e + except ProxyTimeoutError as e: + raise httpx.ProxyError("ProxyTimeoutError: " + e.args[0], request=request) from e + except ProxyError as e: + raise httpx.ProxyError("ProxyError: " + e.args[0], request=request) from e def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries): @@ -157,9 +84,7 @@ def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit verify=verify, http2=http2, local_address=local_address, - max_connections=limit.max_connections, - max_keepalive_connections=limit.max_keepalive_connections, - keepalive_expiry=limit.keepalive_expiry, + limits=limit, retries=retries, **TRANSPORT_KWARGS, ) @@ -167,13 +92,13 @@ def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit def get_transport(verify, http2, local_address, proxy_url, limit, retries): verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify - return AsyncHTTPTransportFixed( + return httpx.AsyncHTTPTransport( # pylint: disable=protected-access verify=verify, http2=http2, - local_address=local_address, - proxy=httpx._config.Proxy(proxy_url) if proxy_url else None, limits=limit, + proxy=httpx._config.Proxy(proxy_url) if proxy_url else None, + local_address=local_address, retries=retries, **TRANSPORT_KWARGS, ) diff --git a/searx/network/network.py b/searx/network/network.py index 9e14e14bd..43140b44d 100644 --- a/searx/network/network.py +++ b/searx/network/network.py @@ -213,15 +213,18 @@ class Network: await asyncio.gather(*[close_client(client) for client in self._clients.values()], return_exceptions=False) @staticmethod - def get_kwargs_clients(kwargs): + def extract_kwargs_clients(kwargs): kwargs_clients = {} if 'verify' in kwargs: kwargs_clients['verify'] = kwargs.pop('verify') if 'max_redirects' in kwargs: kwargs_clients['max_redirects'] = kwargs.pop('max_redirects') + if 'allow_redirects' in kwargs: + # see https://github.com/encode/httpx/pull/1808 + kwargs['follow_redirects'] = kwargs.pop('allow_redirects') return kwargs_clients - def is_valid_respones(self, response): + def is_valid_response(self, response): # pylint: disable=too-many-boolean-expressions if ( (self.retry_on_http_error is True and 400 <= response.status_code <= 599) @@ -231,33 +234,39 @@ class Network: return False return True - async def request(self, method, url, **kwargs): + async def call_client(self, stream, method, url, **kwargs): retries = self.retries + was_disconnected = False + kwargs_clients = Network.extract_kwargs_clients(kwargs) while retries >= 0: # pragma: no cover - kwargs_clients = Network.get_kwargs_clients(kwargs) client = await self.get_client(**kwargs_clients) try: - response = await client.request(method, url, **kwargs) - if self.is_valid_respones(response) or retries <= 0: + if stream: + response = client.stream(method, url, **kwargs) + else: + response = await client.request(method, url, **kwargs) + if self.is_valid_response(response) or retries <= 0: return response + except httpx.RemoteProtocolError as e: + if not was_disconnected: + # the server has closed the connection: + # try again without decreasing the retries variable & with a new HTTP client + was_disconnected = True + await client.aclose() + self._logger.warning('httpx.RemoteProtocolError: the server has disconnected, retrying') + continue + if retries <= 0: + raise e except (httpx.RequestError, httpx.HTTPStatusError) as e: if retries <= 0: raise e retries -= 1 + async def request(self, method, url, **kwargs): + return await self.call_client(False, method, url, **kwargs) + async def stream(self, method, url, **kwargs): - retries = self.retries - while retries >= 0: # pragma: no cover - kwargs_clients = Network.get_kwargs_clients(kwargs) - client = await self.get_client(**kwargs_clients) - try: - response = client.stream(method, url, **kwargs) - if self.is_valid_respones(response) or retries <= 0: - return response - except (httpx.RequestError, httpx.HTTPStatusError) as e: - if retries <= 0: - raise e - retries -= 1 + return await self.call_client(True, method, url, **kwargs) @classmethod async def aclose_all(cls): diff --git a/tests/unit/network/test_network.py b/tests/unit/network/test_network.py index d25a0d77b..4253e69ac 100644 --- a/tests/unit/network/test_network.py +++ b/tests/unit/network/test_network.py @@ -76,13 +76,15 @@ class TestNetwork(SearxTestCase): 'verify': True, 'max_redirects': 5, 'timeout': 2, + 'allow_redirects': True, } - kwargs_client = Network.get_kwargs_clients(kwargs) + kwargs_client = Network.extract_kwargs_clients(kwargs) self.assertEqual(len(kwargs_client), 2) - self.assertEqual(len(kwargs), 1) + self.assertEqual(len(kwargs), 2) self.assertEqual(kwargs['timeout'], 2) + self.assertEqual(kwargs['follow_redirects'], True) self.assertTrue(kwargs_client['verify']) self.assertEqual(kwargs_client['max_redirects'], 5)