From d5ef764d3256924b3c71b1cf17703c61a30e96cc Mon Sep 17 00:00:00 2001 From: Alexandre Flament Date: Fri, 18 Aug 2023 12:16:02 +0000 Subject: [PATCH] searx.network: refactoring * Allow to keep one HTTP client all along during the processing of one user query in one engine. * async code is in one place: client.py, nowhere else. The stream method returns a httpx.Response as expected instead of a tuple as it is now in the master branch. --- .pylintrc | 1 + requirements-dev.txt | 2 +- requirements.txt | 5 +- searx/autocomplete.py | 5 +- searx/network/__init__.py | 697 +++++++++------ searx/network/client.py | 554 +++++++++--- searx/network/context.py | 387 +++++++++ searx/network/network.py | 802 ++++++++++-------- searx/network/raise_for_httperror.py | 4 +- searx/search/__init__.py | 6 +- searx/search/checker/impl.py | 8 +- searx/search/processors/abstract.py | 17 +- searx/search/processors/online.py | 37 +- searx/settings_defaults.py | 6 +- searx/webapp.py | 13 +- .../update/update_ahmia_blacklist.py | 5 +- searxng_extra/update/update_currencies.py | 3 +- .../update/update_engine_descriptions.py | 2 +- searxng_extra/update/update_engine_traits.py | 2 +- searxng_extra/update/update_external_bangs.py | 18 +- .../update/update_firefox_version.py | 5 +- searxng_extra/update/update_osm_keys_tags.py | 11 +- searxng_extra/update/update_wikidata_units.py | 2 + tests/__init__.py | 4 +- tests/unit/network/network_settings.yml | 97 +++ tests/unit/network/test_network.py | 441 +++++++--- tests/unit/network/test_network_settings.py | 31 + 27 files changed, 2281 insertions(+), 884 deletions(-) create mode 100644 searx/network/context.py create mode 100644 tests/unit/network/network_settings.yml create mode 100644 tests/unit/network/test_network_settings.py diff --git a/.pylintrc b/.pylintrc index a5b00227e..55c53a83f 100644 --- a/.pylintrc +++ b/.pylintrc @@ -62,6 +62,7 @@ confidence= disable=duplicate-code, missing-function-docstring, consider-using-f-string, + too-few-public-methods, # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option diff --git a/requirements-dev.txt b/requirements-dev.txt index 6a2d00ac8..b82614830 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,6 @@ mock==5.1.0 nose2[coverage_plugin]==0.14.0 +parameterized==0.9.0 cov-core==1.15.0 black==22.12.0 pylint==3.0.2 @@ -17,7 +18,6 @@ sphinx-autobuild==2021.3.14 sphinx-notfound-page==1.0.0 myst-parser==2.0.0 linuxdoc==20231020 -aiounittest==1.4.2 yamllint==1.32.0 wlc==1.13 coloredlogs==15.0.1 diff --git a/requirements.txt b/requirements.txt index 5a19a723c..f67f1edbd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,10 +7,9 @@ lxml==4.9.3 pygments==2.16.1 python-dateutil==2.8.2 pyyaml==6.0.1 -httpx[http2]==0.24.1 +httpx[http2]==0.25.0 Brotli==1.1.0 -uvloop==0.19.0 -httpx-socks[asyncio]==0.7.7 +httpx-socks==0.7.8 setproctitle==1.3.3 redis==4.6.0 markdown-it-py==3.0.0 diff --git a/searx/autocomplete.py b/searx/autocomplete.py index 58655e26f..f9550c692 100644 --- a/searx/autocomplete.py +++ b/searx/autocomplete.py @@ -16,7 +16,7 @@ from searx.engines import ( engines, google, ) -from searx.network import get as http_get +from searx.network import NETWORKS from searx.exceptions import SearxEngineResponseException @@ -24,7 +24,8 @@ def get(*args, **kwargs): if 'timeout' not in kwargs: kwargs['timeout'] = settings['outgoing']['request_timeout'] kwargs['raise_for_httperror'] = True - return http_get(*args, **kwargs) + network_context = NETWORKS.get('autocomplete').get_context() + return network_context.request('GET', *args, **kwargs) def brave(query, _lang): diff --git a/searx/network/__init__.py b/searx/network/__init__.py index 67028f5f0..dca02f896 100644 --- a/searx/network/__init__.py +++ b/searx/network/__init__.py @@ -1,266 +1,483 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # lint: pylint -# pylint: disable=missing-module-docstring, global-statement +# pyright: basic +# pylint: disable=redefined-outer-name +# ^^ because there is the raise_for_httperror function and the raise_for_httperror parameter. +"""HTTP for SearXNG. -import asyncio +In httpx and similar libraries, a client (also named session) contains a pool of HTTP connections. +The client reuses these HTTP connections and automatically recreates them when the server at the other +end closes the connections. Whatever the library, each client uses only one proxy (eventually none) and only +one local IP address. + +SearXNG's primary use case is an engine sending one (or more) outgoing HTTP request(s). The admin can configure +an engine to use multiple proxies and/or IP addresses: SearXNG sends the outgoing HTTP requests through these +different proxies/IP addresses ( = HTTP clients ) on a rotational basis. + +In addition, when SearXNG runs an engine request, there is a hard timeout: the engine runtime must not exceed +a defined value. + +Moreover, an engine can ask SearXNG to retry a failed HTTP request. + +However, we want to keep the engine codes simple and keep the complexity either in the configuration or the +core component components (here, in this module). + +To answer the above requirements, the `searx.network` module introduces three components: +* HTTPClient and TorHTTPClient are two classes that wrap one or multiple httpx.Client +* NetworkManager, a set of named Network. Each Network + * holds the configuration defined in settings.yml + * creates NetworkContext fed with an HTTPClient (or TorHTTPClient). + This is where the rotation between the proxies and IP addresses happens. +* NetworkContext to provide a runtime context for the engines. The constructor needs a global timeout + and an HTTPClient factory. NetworkContext is an abstract class with three implementations, + one for each retry policy. + +It is only possible to send an HTTP request with a NetworkContext +(otherwise, SearXNG raises a NetworkContextNotFound exception). +Two helpers set a NetworkContext for the current thread: + +* The decorator `@provide_networkcontext`, the intended usage is an external script (see searxng_extra) +* The context manager `networkcontext_for_thread`, for the generic use case. + +Inside the thread, the caller can use `searx.network.get`, `searx.network.post` and similar functions without +caring about the HTTP client. However, if the caller creates a new thread, it must initialize a new NetworkContext. +A NetworkContext is most probably thread-safe, but this has not been tested. + +The overall architecture: +* searx.network.network.NETWORKS contains all the networks. + The method `NetworkManager.get(network_name)` returns an initialized Network. +* searx.network.network.Network define one network (a set of proxies, local IP address, etc...). + They are defined in settings.yml. + The method `Network.get_context()` creates a new NetworkContext. +* searx.network.context contains three different implementations of NetworkContext. One for each retry policy. +* searx.network.client.HTTPClient and searx.network.client.TorHTTPClient implements wrappers around httpx.Client +""" import threading -import concurrent.futures -from queue import SimpleQueue -from types import MethodType -from timeit import default_timer -from typing import Iterable, NamedTuple, Tuple, List, Dict, Union from contextlib import contextmanager +from functools import wraps +from typing import Any, Callable, Optional, Union import httpx -import anyio -from .network import get_network, initialize, check_network_configuration # pylint:disable=cyclic-import -from .client import get_loop -from .raise_for_httperror import raise_for_httperror +from searx.network.client import NOTSET, _NotSetClass +from searx.network.context import NetworkContext, P, R +from searx.network.network import NETWORKS +from searx.network.raise_for_httperror import raise_for_httperror + +__all__ = [ + "NETWORKS", + "NetworkContextNotFound", + "networkcontext_for_thread", + "provide_networkcontext", + "raise_for_httperror", + "request", + "get", + "options", + "head", + "post", + "put", + "patch", + "delete", +] -THREADLOCAL = threading.local() -"""Thread-local data is data for thread specific values.""" +_THREADLOCAL = threading.local() +"""Thread-local that contains only one field: network_context.""" + +_NETWORK_CONTEXT_KEY = 'network_context' +"""Key to access _THREADLOCAL""" + +DEFAULT_MAX_REDIRECTS = httpx._config.DEFAULT_MAX_REDIRECTS # pylint: disable=protected-access -def reset_time_for_thread(): - THREADLOCAL.total_time = 0 +class NetworkContextNotFound(Exception): + """A NetworkContext is expected to be set in this thread. - -def get_time_for_thread(): - """returns thread's total time or None""" - return THREADLOCAL.__dict__.get('total_time') - - -def set_timeout_for_thread(timeout, start_time=None): - THREADLOCAL.timeout = timeout - THREADLOCAL.start_time = start_time - - -def set_context_network_name(network_name): - THREADLOCAL.network = get_network(network_name) - - -def get_context_network(): - """If set return thread's network. - - If unset, return value from :py:obj:`get_network`. + Use searx.network.set_context_for_thread or searx.network.context_for_thread + to set a NetworkContext """ - return THREADLOCAL.__dict__.get('network') or get_network() @contextmanager -def _record_http_time(): - # pylint: disable=too-many-branches - time_before_request = default_timer() - start_time = getattr(THREADLOCAL, 'start_time', time_before_request) - try: - yield start_time - finally: - # update total_time. - # See get_time_for_thread() and reset_time_for_thread() - if hasattr(THREADLOCAL, 'total_time'): - time_after_request = default_timer() - THREADLOCAL.total_time += time_after_request - time_before_request +def networkcontext_for_thread( + network_name: Optional[str] = None, timeout: Optional[float] = None, start_time: Optional[float] = None +): + """Context manager to set a NetworkContext for the current thread + The timeout is for the whole function and is infinite by default (None). + The timeout is counted from the current time or start_time if different from None. -def _get_timeout(start_time, kwargs): - # pylint: disable=too-many-branches + Example of usage: - # timeout (httpx) - if 'timeout' in kwargs: - timeout = kwargs['timeout'] - else: - timeout = getattr(THREADLOCAL, 'timeout', None) - if timeout is not None: - kwargs['timeout'] = timeout + ```python + from time import sleep + from searx.network import networkcontext_for_thread, get - # 2 minutes timeout for the requests without timeout - timeout = timeout or 120 + def search(query): + # the timeout is automatically set to 2.0 seconds (the remaining time for the NetworkContext) + # 2.0 because the timeout for the NetworkContext is 3.0 and one second has elllapsed with sleep(1.0) + auckland_time = get("http://worldtimeapi.org/api/timezone/Pacific/Auckland").json() + # the timeout is automatically set to 2.0 - (runtime of the previous HTTP request) + ip_time = get("http://worldtimeapi.org/api/ip").json() + return auckland_time, ip_time - # adjust actual timeout - timeout += 0.2 # overhead - if start_time: - timeout -= default_timer() - start_time - - return timeout - - -def request(method, url, **kwargs): - """same as requests/requests/api.py request(...)""" - with _record_http_time() as start_time: - network = get_context_network() - timeout = _get_timeout(start_time, kwargs) - future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), get_loop()) - try: - return future.result(timeout) - except concurrent.futures.TimeoutError as e: - raise httpx.TimeoutException('Timeout', request=None) from e - - -def multi_requests(request_list: List["Request"]) -> List[Union[httpx.Response, Exception]]: - """send multiple HTTP requests in parallel. Wait for all requests to finish.""" - with _record_http_time() as start_time: - # send the requests - network = get_context_network() - loop = get_loop() - future_list = [] - for request_desc in request_list: - timeout = _get_timeout(start_time, request_desc.kwargs) - future = asyncio.run_coroutine_threadsafe( - network.request(request_desc.method, request_desc.url, **request_desc.kwargs), loop - ) - future_list.append((future, timeout)) - - # read the responses - responses = [] - for future, timeout in future_list: - try: - responses.append(future.result(timeout)) - except concurrent.futures.TimeoutError: - responses.append(httpx.TimeoutException('Timeout', request=None)) - except Exception as e: # pylint: disable=broad-except - responses.append(e) - return responses - - -class Request(NamedTuple): - """Request description for the multi_requests function""" - - method: str - url: str - kwargs: Dict[str, str] = {} - - @staticmethod - def get(url, **kwargs): - return Request('GET', url, kwargs) - - @staticmethod - def options(url, **kwargs): - return Request('OPTIONS', url, kwargs) - - @staticmethod - def head(url, **kwargs): - return Request('HEAD', url, kwargs) - - @staticmethod - def post(url, **kwargs): - return Request('POST', url, kwargs) - - @staticmethod - def put(url, **kwargs): - return Request('PUT', url, kwargs) - - @staticmethod - def patch(url, **kwargs): - return Request('PATCH', url, kwargs) - - @staticmethod - def delete(url, **kwargs): - return Request('DELETE', url, kwargs) - - -def get(url, **kwargs): - kwargs.setdefault('allow_redirects', True) - return request('get', url, **kwargs) - - -def options(url, **kwargs): - kwargs.setdefault('allow_redirects', True) - return request('options', url, **kwargs) - - -def head(url, **kwargs): - kwargs.setdefault('allow_redirects', False) - return request('head', url, **kwargs) - - -def post(url, data=None, **kwargs): - return request('post', url, data=data, **kwargs) - - -def put(url, data=None, **kwargs): - return request('put', url, data=data, **kwargs) - - -def patch(url, data=None, **kwargs): - return request('patch', url, data=data, **kwargs) - - -def delete(url, **kwargs): - return request('delete', url, **kwargs) - - -async def stream_chunk_to_queue(network, queue, method, url, **kwargs): - try: - async with await network.stream(method, url, **kwargs) as response: - queue.put(response) - # aiter_raw: access the raw bytes on the response without applying any HTTP content decoding - # https://www.python-httpx.org/quickstart/#streaming-responses - async for chunk in response.aiter_raw(65536): - if len(chunk) > 0: - queue.put(chunk) - except (httpx.StreamClosed, anyio.ClosedResourceError): - # the response was queued before the exception. - # the exception was raised on aiter_raw. - # we do nothing here: in the finally block, None will be queued - # so stream(method, url, **kwargs) generator can stop - pass - except Exception as e: # pylint: disable=broad-except - # broad except to avoid this scenario: - # exception in network.stream(method, url, **kwargs) - # -> the exception is not catch here - # -> queue None (in finally) - # -> the function below steam(method, url, **kwargs) has nothing to return - queue.put(e) - finally: - queue.put(None) - - -def _stream_generator(method, url, **kwargs): - queue = SimpleQueue() - network = get_context_network() - future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop()) - - # yield chunks - obj_or_exception = queue.get() - while obj_or_exception is not None: - if isinstance(obj_or_exception, Exception): - raise obj_or_exception - yield obj_or_exception - obj_or_exception = queue.get() - future.result() - - -def _close_response_method(self): - asyncio.run_coroutine_threadsafe(self.aclose(), get_loop()) - # reach the end of _self.generator ( _stream_generator ) to an avoid memory leak. - # it makes sure that : - # * the httpx response is closed (see the stream_chunk_to_queue function) - # * to call future.result() in _stream_generator - for _ in self._generator: # pylint: disable=protected-access - continue - - -def stream(method, url, **kwargs) -> Tuple[httpx.Response, Iterable[bytes]]: - """Replace httpx.stream. - - Usage: - response, stream = poolrequests.stream(...) - for chunk in stream: - ... - - httpx.Client.stream requires to write the httpx.HTTPTransport version of the - the httpx.AsyncHTTPTransport declared above. + # "worldtimeapi" is network defined in settings.yml + # network_context.call might call multiple times the search function, + # however the timeout will be respected. + with networkcontext_for_thread('worldtimeapi', timeout=3.0) as network_context: + sleep(1.0) + auckland_time, ip_time = network_context.call(search(query)) + print("Auckland time: ", auckland_time["datetime"]) + print("My time: ", ip_time["datetime"]) + print("HTTP runtime:", network_context.get_http_runtime()) + ``` """ - generator = _stream_generator(method, url, **kwargs) + network = NETWORKS.get(network_name) + network_context = network.get_context(timeout=timeout, start_time=start_time) + setattr(_THREADLOCAL, _NETWORK_CONTEXT_KEY, network_context) + try: + yield network_context + finally: + delattr(_THREADLOCAL, _NETWORK_CONTEXT_KEY) + del network_context - # yield response - response = next(generator) # pylint: disable=stop-iteration-return - if isinstance(response, Exception): - raise response - response._generator = generator # pylint: disable=protected-access - response.close = MethodType(_close_response_method, response) +def provide_networkcontext( + network_name: Optional[str] = None, timeout: Optional[float] = None, start_time: Optional[float] = None +): + """Set the NetworkContext, then call the wrapped function using searx.network.context.NetworkContext.call - return response, generator + The timeout is for the whole function and is infinite by default (None). + The timeout is counted from the current time or start_time if different from None + + Intended usage: to provide a NetworkContext for scripts in searxng_extra. + + Example of usage: + + ```python + from time import sleep + from searx import network + + @network.provide_networkcontext(timeout=3.0) + def main() + sleep(1.0) + # the timeout is automatically set to 2.0 (the remaining time for the NetworkContext). + my_ip = network.get("https://ifconfig.me/ip").text + print(my_ip) + + if __name__ == '__main__': + main() + ``` + """ + + def func_outer(func: Callable[P, R]): + @wraps(func) + def func_inner(*args: P.args, **kwargs: P.kwargs) -> R: + with networkcontext_for_thread(network_name, timeout, start_time) as network_context: + return network_context.call(func, *args, **kwargs) + + return func_inner + + return func_outer + + +def request( + method: str, + url: str, + params: Optional[httpx._types.QueryParamTypes] = None, + content: Optional[httpx._types.RequestContent] = None, + data: Optional[httpx._types.RequestData] = None, + files: Optional[httpx._types.RequestFiles] = None, + json: Optional[Any] = None, + headers: Optional[httpx._types.HeaderTypes] = None, + cookies: Optional[httpx._types.CookieTypes] = None, + auth: Optional[httpx._types.AuthTypes] = None, + timeout: httpx._types.TimeoutTypes = None, + allow_redirects: bool = False, + max_redirects: Union[_NotSetClass, int] = NOTSET, + verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET, + raise_for_httperror: bool = False, +) -> httpx.Response: + """Similar to httpx.request ( https://www.python-httpx.org/api/ ) with some differences: + + * proxies: + it is not available and has to be defined in the Network configuration (in settings.yml) + * cert: + it is not available and is always None. + * trust_env: + it is not available and is always True. + * timeout: + the implementation uses the lowest timeout between this parameter and remaining time for the NetworkContext. + * allow_redirects: + it replaces the follow_redirects parameter to be compatible with the requests API. + * raise_for_httperror: + when True, this function calls searx.network.raise_for_httperror.raise_for_httperror. + + Some parameters from httpx.Client ( https://www.python-httpx.org/api/#client) are available: + + * max_redirects: + Set to None to use the value from the Network configuration. + The maximum number of redirect responses that should be followed. + * verify: + Set to None to use the value from the Network configuration. + * limits: + it has to be defined in the Network configuration (in settings.yml) + * default_encoding: + this parameter is not available and is always "utf-8". + + This function requires a NetworkContext provided by either provide_networkcontext or networkcontext_for_thread. + + The implementation uses one or more httpx.Client + """ + # pylint: disable=too-many-arguments + network_context: Optional[NetworkContext] = getattr(_THREADLOCAL, _NETWORK_CONTEXT_KEY, None) + if network_context is None: + raise NetworkContextNotFound() + http_client = network_context._get_http_client() # pylint: disable=protected-access + return http_client.request( + method, + url, + params=params, + content=content, + data=data, + files=files, + json=json, + headers=headers, + cookies=cookies, + auth=auth, + timeout=timeout, + allow_redirects=allow_redirects, + max_redirects=max_redirects, + verify=verify, + raise_for_httperror=raise_for_httperror, + ) + + +def get( + url: str, + params: Optional[httpx._types.QueryParamTypes] = None, + headers: Optional[httpx._types.HeaderTypes] = None, + cookies: Optional[httpx._types.CookieTypes] = None, + auth: Optional[httpx._types.AuthTypes] = None, + allow_redirects: bool = True, + max_redirects: Union[_NotSetClass, int] = NOTSET, + verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET, + timeout: httpx._types.TimeoutTypes = None, + raise_for_httperror: bool = False, +) -> httpx.Response: + """Similar to httpx.get, see the request method for the details. + + allow_redirects is by default True (httpx default value is False). + """ + # pylint: disable=too-many-arguments + return request( + "GET", + url, + params=params, + headers=headers, + cookies=cookies, + auth=auth, + allow_redirects=allow_redirects, + max_redirects=max_redirects, + verify=verify, + timeout=timeout, + raise_for_httperror=raise_for_httperror, + ) + + +def options( + url: str, + params: Optional[httpx._types.QueryParamTypes] = None, + headers: Optional[httpx._types.HeaderTypes] = None, + cookies: Optional[httpx._types.CookieTypes] = None, + auth: Optional[httpx._types.AuthTypes] = None, + allow_redirects: bool = False, + max_redirects: Union[_NotSetClass, int] = NOTSET, + verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET, + timeout: httpx._types.TimeoutTypes = None, + raise_for_httperror: bool = False, +) -> httpx.Response: + """Similar to httpx.options, see the request method for the details.""" + # pylint: disable=too-many-arguments + return request( + "OPTIONS", + url, + params=params, + headers=headers, + cookies=cookies, + auth=auth, + allow_redirects=allow_redirects, + max_redirects=max_redirects, + verify=verify, + timeout=timeout, + raise_for_httperror=raise_for_httperror, + ) + + +def head( + url: str, + params: Optional[httpx._types.QueryParamTypes] = None, + headers: Optional[httpx._types.HeaderTypes] = None, + cookies: Optional[httpx._types.CookieTypes] = None, + auth: Optional[httpx._types.AuthTypes] = None, + allow_redirects: bool = False, + max_redirects: Union[_NotSetClass, int] = NOTSET, + verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET, + timeout: httpx._types.TimeoutTypes = None, + raise_for_httperror: bool = False, +) -> httpx.Response: + """Similar to httpx.head, see the request method for the details.""" + # pylint: disable=too-many-arguments + return request( + "HEAD", + url, + params=params, + headers=headers, + cookies=cookies, + auth=auth, + allow_redirects=allow_redirects, + max_redirects=max_redirects, + verify=verify, + timeout=timeout, + raise_for_httperror=raise_for_httperror, + ) + + +def post( + url: str, + content: Optional[httpx._types.RequestContent] = None, + data: Optional[httpx._types.RequestData] = None, + files: Optional[httpx._types.RequestFiles] = None, + json: Optional[Any] = None, + params: Optional[httpx._types.QueryParamTypes] = None, + headers: Optional[httpx._types.HeaderTypes] = None, + cookies: Optional[httpx._types.CookieTypes] = None, + auth: Optional[httpx._types.AuthTypes] = None, + allow_redirects: bool = False, + max_redirects: Union[_NotSetClass, int] = NOTSET, + verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET, + timeout: httpx._types.TimeoutTypes = None, + raise_for_httperror: bool = False, +) -> httpx.Response: + """Similar to httpx.post, see the request method for the details.""" + # pylint: disable=too-many-arguments + return request( + "POST", + url, + content=content, + data=data, + files=files, + json=json, + params=params, + headers=headers, + cookies=cookies, + auth=auth, + allow_redirects=allow_redirects, + max_redirects=max_redirects, + verify=verify, + timeout=timeout, + raise_for_httperror=raise_for_httperror, + ) + + +def put( + url: str, + content: Optional[httpx._types.RequestContent] = None, + data: Optional[httpx._types.RequestData] = None, + files: Optional[httpx._types.RequestFiles] = None, + json: Optional[Any] = None, + params: Optional[httpx._types.QueryParamTypes] = None, + headers: Optional[httpx._types.HeaderTypes] = None, + cookies: Optional[httpx._types.CookieTypes] = None, + auth: Optional[httpx._types.AuthTypes] = None, + allow_redirects: bool = False, + max_redirects: Union[_NotSetClass, int] = NOTSET, + verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET, + timeout: httpx._types.TimeoutTypes = None, + raise_for_httperror: bool = False, +) -> httpx.Response: + """Similar to httpx.put, see the request method for the details.""" + # pylint: disable=too-many-arguments + return request( + "PUT", + url, + content=content, + data=data, + files=files, + json=json, + params=params, + headers=headers, + cookies=cookies, + auth=auth, + allow_redirects=allow_redirects, + max_redirects=max_redirects, + verify=verify, + timeout=timeout, + raise_for_httperror=raise_for_httperror, + ) + + +def patch( + url: str, + content: Optional[httpx._types.RequestContent] = None, + data: Optional[httpx._types.RequestData] = None, + files: Optional[httpx._types.RequestFiles] = None, + json: Optional[Any] = None, + params: Optional[httpx._types.QueryParamTypes] = None, + headers: Optional[httpx._types.HeaderTypes] = None, + cookies: Optional[httpx._types.CookieTypes] = None, + auth: Optional[httpx._types.AuthTypes] = None, + allow_redirects: bool = False, + max_redirects: Union[_NotSetClass, int] = NOTSET, + verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET, + timeout: httpx._types.TimeoutTypes = None, + raise_for_httperror: bool = False, +) -> httpx.Response: + """Similar to httpx.patch, see the request method for the details.""" + # pylint: disable=too-many-arguments + return request( + "PATCH", + url, + content=content, + data=data, + files=files, + json=json, + params=params, + headers=headers, + cookies=cookies, + auth=auth, + allow_redirects=allow_redirects, + max_redirects=max_redirects, + verify=verify, + timeout=timeout, + raise_for_httperror=raise_for_httperror, + ) + + +def delete( + url: str, + params: Optional[httpx._types.QueryParamTypes] = None, + headers: Optional[httpx._types.HeaderTypes] = None, + cookies: Optional[httpx._types.CookieTypes] = None, + auth: Optional[httpx._types.AuthTypes] = None, + allow_redirects: bool = False, + max_redirects: Union[_NotSetClass, int] = NOTSET, + verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET, + timeout: httpx._types.TimeoutTypes = None, + raise_for_httperror: bool = False, +) -> httpx.Response: + """Similar to httpx.delete, see the request method for the details.""" + # pylint: disable=too-many-arguments + return request( + "DELETE", + url, + params=params, + headers=headers, + cookies=cookies, + auth=auth, + allow_redirects=allow_redirects, + max_redirects=max_redirects, + verify=verify, + timeout=timeout, + raise_for_httperror=raise_for_httperror, + ) diff --git a/searx/network/client.py b/searx/network/client.py index 23826c75d..f3bfa14df 100644 --- a/searx/network/client.py +++ b/searx/network/client.py @@ -1,35 +1,65 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # lint: pylint -# pylint: disable=missing-module-docstring, global-statement +# pyright: basic +"""Implement various ABCHTTPClient + +* OneHTTPClient wrapper around httpx.Client +* BaseHTTPClient httpx.Client accept the verify and max_redirects parameter only in the constructor. + BaseHTTPClient allows to pass these parameter in each query by creating multiple OneHTTPClient. +* HTTPClient Inherit from BaseHTTPClient, raise an error according to retry_on_http_error parameter. +* TorHTTPClient Inherit from HTTPClientSoftError, check Tor connectivity +""" -import asyncio import logging import random +from abc import ABC, abstractmethod +from collections import namedtuple from ssl import SSLContext -import threading -from typing import Any, Dict +from typing import Any, Dict, Optional, Tuple, Union import httpx -from httpx_socks import AsyncProxyTransport -from python_socks import parse_proxy_url, ProxyConnectionError, ProxyTimeoutError, ProxyError +from httpx_socks import SyncProxyTransport +from python_socks import ProxyConnectionError, ProxyError, ProxyTimeoutError, parse_proxy_url -from searx import logger +from .raise_for_httperror import raise_for_httperror -# Optional uvloop (support Python 3.6) -try: - import uvloop -except ImportError: - pass -else: - uvloop.install() +CertTypes = Union[ + # certfile + str, + # (certfile, keyfile) + Tuple[str, Optional[str]], + # (certfile, keyfile, password) + Tuple[str, Optional[str], Optional[str]], +] - -logger = logger.getChild('searx.network.client') -LOOP = None SSLCONTEXTS: Dict[Any, SSLContext] = {} -def shuffle_ciphers(ssl_context): +class _NotSetClass: # pylint: disable=too-few-public-methods + """Internal class for this module, do not create instance of this class. + Replace the None value, allow explicitly pass None as a function argument""" + + +NOTSET = _NotSetClass() + + +class SoftRetryHTTPException(Exception): + """Client implementations raise this exception to tell the NetworkContext + the response is invalid even if there is no HTTP exception. + + This exception is INTERNAL to searx.network and must not be seen outside. + + See HTTPClientSoftError which check the HTTP response according to + the raise_for_httperror parameter. + """ + + def __init__(self, response): + self.response = response + message = "SoftRetryHTTPException, you should not see this error" + super().__init__(message) + + +def _shuffle_ciphers(ssl_context): """Shuffle httpx's default ciphers of a SSL context randomly. From `What Is TLS Fingerprint and How to Bypass It`_ @@ -52,18 +82,28 @@ def shuffle_ciphers(ssl_context): ssl_context.set_ciphers(":".join(sc_list + c_list)) -def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False): - key = (proxy_url, cert, verify, trust_env, http2) +def _get_sslcontexts( + local_address: str, + proxy_url: Optional[str], + cert: Optional[CertTypes], + verify: Union[str, bool], + trust_env: bool, + http2: bool, +): + key = (local_address, proxy_url, cert, verify, trust_env, http2) if key not in SSLCONTEXTS: SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2) - shuffle_ciphers(SSLCONTEXTS[key]) + _shuffle_ciphers(SSLCONTEXTS[key]) return SSLCONTEXTS[key] -class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport): +### Transport + + +class _HTTPTransportNoHttp(httpx.HTTPTransport): """Block HTTP request - The constructor is blank because httpx.AsyncHTTPTransport.__init__ creates an SSLContext unconditionally: + The constructor is blank because httpx.HTTPTransport.__init__ creates an SSLContext unconditionally: https://github.com/encode/httpx/blob/0f61aa58d66680c239ce43c8cdd453e7dc532bfc/httpx/_transports/default.py#L271 Each SSLContext consumes more than 500kb of memory, since there is about one network per engine. @@ -78,33 +118,29 @@ class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport): # this on purpose if the base class is not called pass - async def handle_async_request(self, request): + def handle_request(self, request): raise httpx.UnsupportedProtocol('HTTP protocol is disabled') - async def aclose(self) -> None: + def close(self) -> None: pass - async def __aenter__(self): + def __enter__(self): # Use generics for subclass support. return self - async def __aexit__( - self, - exc_type=None, - exc_value=None, - traceback=None, - ) -> None: + def __exit__(self, exc_type, exc_value, traceback) -> None: # pylint: disable=signature-differs + # avoid to import the various type for the signature, but pylint is not happy pass -class AsyncProxyTransportFixed(AsyncProxyTransport): - """Fix httpx_socks.AsyncProxyTransport +class _CustomSyncProxyTransport(SyncProxyTransport): + """Inherit from httpx_socks.SyncProxyTransport Map python_socks exceptions to httpx.ProxyError exceptions """ - async def handle_async_request(self, request): + def handle_request(self, request): try: - return await super().handle_async_request(request) + return super().handle_request(request) except ProxyConnectionError as e: raise httpx.ProxyError("ProxyConnectionError: " + e.strerror, request=request) from e except ProxyTimeoutError as e: @@ -113,7 +149,7 @@ class AsyncProxyTransportFixed(AsyncProxyTransport): raise httpx.ProxyError("ProxyError: " + e.args[0], request=request) from e -def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries): +def _get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries): # support socks5h (requests compatibility): # https://requests.readthedocs.io/en/master/user/advanced/#socks # socks5:// hostname is resolved on client side @@ -125,16 +161,17 @@ def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit rdns = True proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url) - verify = get_sslcontexts(proxy_url, None, verify, True, http2) if verify is True else verify - return AsyncProxyTransportFixed( + verify = _get_sslcontexts(local_address, proxy_url, None, verify, True, http2) if verify is True else verify + + # About verify: in ProxyTransportFixed, verify is of type httpx._types.VerifyTypes + return _CustomSyncProxyTransport( proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port, username=proxy_username, password=proxy_password, rdns=rdns, - loop=get_loop(), - verify=verify, + verify=verify, # type: ignore http2=http2, local_address=local_address, limits=limit, @@ -142,9 +179,9 @@ def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit ) -def get_transport(verify, http2, local_address, proxy_url, limit, retries): - verify = get_sslcontexts(None, None, verify, True, http2) if verify is True else verify - return httpx.AsyncHTTPTransport( +def _get_transport(verify, http2, local_address, proxy_url, limit, retries): + verify = _get_sslcontexts(local_address, None, None, verify, True, http2) if verify is True else verify + return httpx.HTTPTransport( # pylint: disable=protected-access verify=verify, http2=http2, @@ -155,56 +192,378 @@ def get_transport(verify, http2, local_address, proxy_url, limit, retries): ) -def new_client( - # pylint: disable=too-many-arguments - enable_http, - verify, - enable_http2, - max_connections, - max_keepalive_connections, - keepalive_expiry, - proxies, - local_address, - retries, - max_redirects, - hook_log_response, -): - limit = httpx.Limits( - max_connections=max_connections, - max_keepalive_connections=max_keepalive_connections, - keepalive_expiry=keepalive_expiry, - ) - # See https://www.python-httpx.org/advanced/#routing - mounts = {} - for pattern, proxy_url in proxies.items(): - if not enable_http and pattern.startswith('http://'): - continue - if proxy_url.startswith('socks4://') or proxy_url.startswith('socks5://') or proxy_url.startswith('socks5h://'): - mounts[pattern] = get_transport_for_socks_proxy( - verify, enable_http2, local_address, proxy_url, limit, retries +### Clients + + +class ABCHTTPClient(ABC): + """Abstract HTTP client + + Multiple implementation are defined bellow. + There are like an onion: each implementation relies on the previous one + and bring new feature. + """ + + @abstractmethod + def send(self, stream: bool, method: str, url: str, **kwargs) -> httpx.Response: + pass + + @abstractmethod + def close(self): + pass + + @property + @abstractmethod + def is_closed(self) -> bool: + pass + + def request(self, method, url, **kwargs) -> httpx.Response: + return self.send(False, method, url, **kwargs) + + def stream(self, method, url, **kwargs) -> httpx.Response: + return self.send(True, method, url, **kwargs) + + +class OneHTTPClient(ABCHTTPClient): + """Wrap a httpx.Client + + Use httpx_socks for socks proxies. + + Deal with httpx.RemoteProtocolError exception: httpx raises this exception when the + HTTP/2 server disconnect. It is excepted to reconnect. + Related to https://github.com/encode/httpx/issues/1478 + Perhaps it can be removed now : TODO check in production. + + To be backward compatible with Request: + + * In Response, "ok" is set to "not response.is_error()" + See https://www.python-httpx.org/compatibility/#checking-for-success-and-failure-responses + + * allow_redirects is accepted + See https://www.python-httpx.org/compatibility/#redirects + """ + + def __init__( + # pylint: disable=too-many-arguments + self, + verify=True, + enable_http=True, + enable_http2=False, + max_connections=None, + max_keepalive_connections=None, + keepalive_expiry=None, + proxies=None, + local_addresses=None, + max_redirects=30, + hook_log_response=None, + log_trace=None, + allow_redirects=True, + logger=None, + ): + self.enable_http = enable_http + self.verify = verify + self.enable_http2 = enable_http2 + self.max_connections = max_connections + self.max_keepalive_connections = max_keepalive_connections + self.keepalive_expiry = keepalive_expiry + self.proxies = proxies or {} + self.local_address = local_addresses + self.max_redirects = max_redirects + self.hook_log_response = hook_log_response + self.allow_redirects = allow_redirects + self.logger = logger + self.extensions = None + if log_trace: + self.extensions = {"trace": log_trace} + self._new_client() + + def send(self, stream, method, url, timeout=None, **kwargs): + self._patch_request(kwargs) + retry = 1 + response = None + while retry >= 0: # pragma: no cover + retry -= 1 + try: + if stream: + # from https://www.python-httpx.org/async/#streaming-responses + # > For situations when context block usage is not practical, + # > it is possible to enter "manual mode" by sending a Request + # > instance using client.send(..., stream=True). + request = self.client.build_request( + method=method, + url=url, + content=kwargs.get("content"), + data=kwargs.get("data"), + files=kwargs.get("files"), + json=kwargs.get("json"), + params=kwargs.get("params"), + headers=kwargs.get("headers"), + cookies=kwargs.get("cookies"), + timeout=timeout, + extensions=self.extensions, + ) + response = self.client.send( + request, + stream=True, + follow_redirects=kwargs.get("follow_redirects", False), + auth=kwargs.get("auth"), + ) + else: + response = self.client.request(method, url, extensions=self.extensions, timeout=timeout, **kwargs) + self._patch_response(response) + return response + except httpx.RemoteProtocolError as e: + if retry >= 0: + # the server has closed the connection: + # try again without decreasing the retries variable & with a new HTTP client + self._reconnect_client() + if self.logger: + self.logger.warning('httpx.RemoteProtocolError: the server has disconnected, retrying') + continue + raise e + except (httpx.RequestError, httpx.HTTPStatusError) as e: + raise e + return response # type: ignore + + def close(self): + self.client.close() + + @property + def is_closed(self) -> bool: + return self.client.is_closed + + def _new_client(self): + limit = httpx.Limits( + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive_connections, + keepalive_expiry=self.keepalive_expiry, + ) + # See https://www.python-httpx.org/advanced/#routing + mounts = {} + for pattern, proxy_url in self.proxies.items(): + if not self.enable_http and pattern.startswith('http://'): + continue + if ( + proxy_url.startswith('socks4://') + or proxy_url.startswith('socks5://') + or proxy_url.startswith('socks5h://') + ): + mounts[pattern] = _get_transport_for_socks_proxy( + self.verify, self.enable_http2, self.local_address, proxy_url, limit, 0 + ) + else: + mounts[pattern] = _get_transport( + self.verify, self.enable_http2, self.local_address, proxy_url, limit, 0 + ) + + if not self.enable_http: + mounts['http://'] = _HTTPTransportNoHttp() + + transport = _get_transport(self.verify, self.enable_http2, self.local_address, None, limit, 0) + + event_hooks = None + if self.hook_log_response: + event_hooks = {'response': [self.hook_log_response]} + self.client = httpx.Client( + transport=transport, + mounts=mounts, + max_redirects=self.max_redirects, + event_hooks=event_hooks, + ) + + def _reconnect_client(self): + self.client.close() + self._new_client() + + def _patch_request(self, kwargs): + # see https://www.python-httpx.org/compatibility/#redirects + follow_redirects = self.allow_redirects + if 'allow_redirects' in kwargs: + # see https://github.com/encode/httpx/pull/1808 + follow_redirects = kwargs.pop('allow_redirects') + kwargs['follow_redirects'] = follow_redirects + + def _patch_response(self, response): + if isinstance(response, httpx.Response): + # requests compatibility (response is not streamed) + # see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses + response.ok = not response.is_error # type: ignore + + return response + + +_HTTPMultiClientConf = namedtuple('HTTPMultiClientConf', ['verify', 'max_redirects']) + + +class BaseHTTPClient(ABCHTTPClient): + """Some parameter like verify, max_redirects are defined at the client level, + not at the request level. + + This class allow to specify these parameters at the request level. + The implementation uses multiple instances of OneHTTPClient + + This class does not deal with the retry_on_http_error parameter + """ + + def __init__( + self, + **kwargs, + ): + # set the default values + self.default = _HTTPMultiClientConf(True, 30) + # extract the values from the HTTPCient constructor + # the line before is mandatory to be able to self._extract_kwargs_clients + # and keep the other arguments + self.default, self.kwargs = self._extract_kwargs_clients(kwargs) + self.clients: Dict[Tuple, OneHTTPClient] = {} + + def close(self): + for client in self.clients.values(): + client.close() + + @property + def is_closed(self) -> bool: + return all(client.is_closed for client in self.clients.values()) + + def send(self, stream, method, url, timeout=None, **kwargs): + client, kwargs = self._get_client_and_update_kwargs(**kwargs) + return client.send(stream, method, url, timeout, **kwargs) + + def _get_client_and_update_kwargs(self, **kwargs) -> Tuple[OneHTTPClient, Dict]: + # extract HTTPMultiClientConf using the parameter in the request + # and fallback to the parameters defined in the constructor + # = the parameters set in the network settings + kwargs_clients, kwargs = self._extract_kwargs_clients(kwargs) + if kwargs_clients not in self.clients: + self.clients[kwargs_clients] = OneHTTPClient( + verify=kwargs_clients.verify, + max_redirects=kwargs_clients.max_redirects, + **self.kwargs, ) + return self.clients[kwargs_clients], kwargs + + def _extract_kwargs_clients(self, kwargs) -> Tuple[_HTTPMultiClientConf, Dict]: + # default values + # see https://www.python-httpx.org/compatibility/#ssl-configuration + verify = kwargs.pop('verify', NOTSET) + max_redirects = kwargs.pop('max_redirects', NOTSET) + if verify == NOTSET: + verify = self.default.verify + if max_redirects == NOTSET: + max_redirects = self.default.max_redirects + return _HTTPMultiClientConf(verify, max_redirects), kwargs + + +class HTTPClient(BaseHTTPClient): + """Inherit from BaseHTTPClient, raise an exception according to the retry_on_http_error parameter""" + + def __init__(self, retry_on_http_error=None, **kwargs): + super().__init__(**kwargs) + self.retry_on_http_error = retry_on_http_error + self._check_configuration() + + def _check_configuration(self): + # make sure we can create at least an OneHTTPClient without exception + self._get_client_and_update_kwargs() + + def send(self, stream, method, url, timeout=None, **kwargs): + try: + do_raise_for_httperror = self._extract_do_raise_for_httperror(kwargs) + response = super().send(stream, method, url, timeout=timeout, **kwargs) + if do_raise_for_httperror: + raise_for_httperror(response) + if self._is_error_but_retry(response): + raise SoftRetryHTTPException(response) + return response + except (httpx.RequestError, httpx.HTTPStatusError) as e: + raise e + + def _is_error_but_retry(self, response): + # pylint: disable=too-many-boolean-expressions + if ( + (self.retry_on_http_error is True and 400 <= response.status_code <= 599) + or (isinstance(self.retry_on_http_error, list) and response.status_code in self.retry_on_http_error) + or (isinstance(self.retry_on_http_error, int) and response.status_code == self.retry_on_http_error) + ): + return True + return False + + @staticmethod + def _extract_do_raise_for_httperror(kwargs): + do_raise_for_httperror = True + if 'raise_for_httperror' in kwargs: + do_raise_for_httperror = kwargs['raise_for_httperror'] + del kwargs['raise_for_httperror'] + return do_raise_for_httperror + + def __repr__(self): + keys_values = " ".join([f"{k}={v!r}" for k, v in self.kwargs.items()]) + return f"<{self.__class__.__name__} retry_on_http_error={self.retry_on_http_error!r} {keys_values}>" + + +class TorHTTPClient(HTTPClient): + """Extend HTTPClientSoftError client. To use with Tor configuration. + + The class checks if the client is really connected through Tor. + """ + + _TOR_CHECK_RESULT = {} + + def __init__(self, proxies=None, local_addresses=None, **kwargs): + self.proxies = proxies + self.local_addresses = local_addresses + super().__init__(proxies=proxies, local_addresses=local_addresses, **kwargs) + + def _check_configuration(self): + if not self._is_connected_through_tor(self.proxies, self.local_addresses): + self.close() + raise httpx.HTTPError('Network configuration problem: not using Tor') + + def _is_connected_through_tor(self, proxies, local_addresses) -> bool: + """TODO : rewrite to check the proxies variable instead of checking the HTTPTransport ?""" + if proxies is None: + return False + + cache_key = (local_addresses, tuple(proxies.items())) + if cache_key in TorHTTPClient._TOR_CHECK_RESULT: + return TorHTTPClient._TOR_CHECK_RESULT[cache_key] + + # False is the client use the DNS from the proxy + use_local_dns = False + + # get one httpx client through get_client_and_update_kwargs + one_http_client, _ = self._get_client_and_update_kwargs(verify=True) + httpx_client = one_http_client.client + # ignore client._transport because it is not used with all:// + for transport in httpx_client._mounts.values(): # pylint: disable=protected-access + if isinstance(transport, _HTTPTransportNoHttp): + # ignore the NO HTTP transport + continue + if isinstance(transport, _CustomSyncProxyTransport) and not getattr( + transport._pool, "_rdns", False # pylint: disable=protected-access # type: ignore + ): + # socks5:// with local DNS + # expect socks5h:// with remote DNS to resolve .onion domain. + use_local_dns = True + break + + # + if use_local_dns: + # no test + result = False else: - mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries) + # actual check + response = one_http_client.request("GET", "https://check.torproject.org/api/ip", timeout=60) + if response.status_code != 200: + result = False + else: + result = bool(response.json().get("IsTor", False)) + TorHTTPClient._TOR_CHECK_RESULT[cache_key] = result + return result - if not enable_http: - mounts['http://'] = AsyncHTTPTransportNoHttp() - - transport = get_transport(verify, enable_http2, local_address, None, limit, retries) - - event_hooks = None - if hook_log_response: - event_hooks = {'response': [hook_log_response]} - - return httpx.AsyncClient( - transport=transport, - mounts=mounts, - max_redirects=max_redirects, - event_hooks=event_hooks, - ) - - -def get_loop(): - return LOOP + @staticmethod + def _clear_cache(): + """Only for the tests""" + TorHTTPClient._TOR_CHECK_RESULT = {} def init(): @@ -220,18 +579,5 @@ def init(): ): logging.getLogger(logger_name).setLevel(logging.WARNING) - # loop - def loop_thread(): - global LOOP - LOOP = asyncio.new_event_loop() - LOOP.run_forever() - - thread = threading.Thread( - target=loop_thread, - name='asyncio_loop', - daemon=True, - ) - thread.start() - init() diff --git a/searx/network/context.py b/searx/network/context.py new file mode 100644 index 000000000..b480749bf --- /dev/null +++ b/searx/network/context.py @@ -0,0 +1,387 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# lint: pylint +# pyright: basic +"""This module implements various NetworkContext which deals with + +* retry strategies: what to do when an HTTP request fails and retries>0 +* record HTTP runtime +* timeout: In engines, the user query starts at one point in time, + the engine timeout is the starting point plus a defined value. + NetworkContext sends HTTP requests following the request timeout and the engine timeouts. + +Example of usage: + +``` +context = NetworkContextRetryFunction(...) # or another implementation + +def my_engine(): + http_client = context.get_http_client() + ip_ifconfig = http_client.request("GET", "https://ifconfig.me/") + print("ip from ifconfig.me ", ip_ifconfig) + ip_myip = http_client.request("GET", "https://api.myip.com").json()["ip"] + print("ip from api.myip.com", ip_myip) + assert ip_ifconfig == ip_myip + # ^^ always true with NetworkContextRetryFunction and NetworkContextRetrySameHTTPClient + +result = context.call(my_engine) +print('HTTP runtime:', context.get_total_time()) +``` + +Note in the code above NetworkContextRetryFunction is instanced directly for the sake of simplicity. +NetworkContext are actually instanciated using Network.get_context(...) + +Various implementations define what to do when there is an exception in the function `my_engine`: + +* `NetworkContextRetryFunction` gets another HTTP client and tries the whole function again. +* `NetworkContextRetryDifferentHTTPClient` gets another HTTP client and tries the query again. +* `NetworkContextRetrySameHTTPClient` tries the query again with the same HTTP client. +""" +import functools +import ssl +from abc import ABC, abstractmethod +from contextlib import contextmanager +from timeit import default_timer +from typing import Callable, Optional, final + +try: + from typing import ParamSpec, TypeVar +except ImportError: + # to support Python < 3.10 + from typing_extensions import ParamSpec, TypeVar + +import httpx + +from searx.network.client import ABCHTTPClient, SoftRetryHTTPException + +P = ParamSpec('P') +R = TypeVar('R') +HTTPCLIENTFACTORY = Callable[[], ABCHTTPClient] + +DEFAULT_TIMEOUT = 120.0 + + +## NetworkContext + + +class NetworkContext(ABC): + """Abstract implementation: the call must defined in concrete classes. + + Lifetime: one engine request or initialization of an engine. + """ + + __slots__ = ('_retries', '_http_client_factory', '_http_client', '_start_time', '_http_time', '_timeout') + + def __init__( + self, + retries: int, + http_client_factory: HTTPCLIENTFACTORY, + start_time: Optional[float], + timeout: Optional[float], + ): + self._retries: int = retries + # wrap http_client_factory here, so we can forget about this wrapping + self._http_client_factory = _TimeHTTPClientWrapper.wrap_factory(http_client_factory, self) + self._http_client: Optional[ABCHTTPClient] = None + self._start_time: float = start_time or default_timer() + self._http_time: float = 0.0 + self._timeout: Optional[float] = timeout + + @abstractmethod + def call(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R: + """Call func within the network context. + + The retry policy might call func multiple times. + + Within the function self.get_http_client() returns an HTTP client to use. + + The retry policy might send multiple times the same HTTP request + until it works or the retry count falls to zero. + """ + + @final + def request(self, *args, **kwargs): + """Convenient method to wrap a call to request inside the call method. + + Use a new HTTP client to wrap a call to the request method using self.call + """ + self._set_http_client() + return self.call(self._get_http_client().request, *args, **kwargs) + + @final + def stream(self, *args, **kwargs): + """Convenient method to wrap a call to stream inside the call method. + + Use a new HTTP client to wrap a call to the stream method using self.call + """ + self._set_http_client() + return self.call(self._get_http_client().stream, *args, **kwargs) + + @final + def get_http_runtime(self) -> Optional[float]: + """Return the amount of time spent on HTTP requests""" + return self._http_time + + @final + def get_remaining_time(self, _override_timeout: Optional[float] = None) -> float: + """Return the remaining time for the context. + + _override_timeout is not intended to be used outside this module. + """ + timeout = _override_timeout or self._timeout or DEFAULT_TIMEOUT + timeout += 0.2 # overhead + timeout -= default_timer() - self._start_time + return timeout + + @final + def _get_http_client(self) -> ABCHTTPClient: + """Return the HTTP client to use for this context.""" + if self._http_client is None: + raise ValueError("HTTP client has not been set") + return self._http_client + + @final + def _set_http_client(self): + """Ask the NetworkContext to use another HTTP client using the factory. + + Use the method _get_new_client_from_factory() to call the factory, + so the NetworkContext implementations can wrap the ABCHTTPClient. + """ + self._http_client = self._get_new_client_from_factory() + + @final + def _reset_http_client(self): + self._http_client = None + + def _get_new_client_from_factory(self): + return self._http_client_factory() + + @contextmanager + def _record_http_time(self): + """This decorator records the code's runtime and adds it to self.total_time""" + time_before_request = default_timer() + try: + yield + finally: + self._http_time += default_timer() - time_before_request + + def __repr__(self): + common_attributes = ( + f"{self.__class__.__name__}" + + f" retries={self._retries!r} timeout={self._timeout!r} http_client={self._http_client!r}" + ) + # get the original factory : see the __init__ method of this class and _TimeHTTPClientWrapper.wrap_factory + factory = self._http_client_factory.__wrapped__ + # break the abstraction safely: get back the Network object through the bound method + # see Network.get_context + bound_instance = getattr(factory, "__self__", None) + if bound_instance is not None and hasattr(bound_instance, 'get_context'): + # bound_instance has a "get_context" attribute: this is most likely a Network + # searx.network.network.Network is not imported to avoid circular import + return f"<{common_attributes} network_context={factory.__self__!r}>" + # fallback : this instance was not created using Network.get_context + return f"<{common_attributes} http_client_factory={factory!r}>" + + +## Measure time and deal with timeout + + +class _TimeHTTPClientWrapper(ABCHTTPClient): + """Wrap an ABCHTTPClient: + * to record the HTTP runtime + * to override the timeout to make sure the total time does not exceed the timeout set on the NetworkContext + """ + + __slots__ = ('http_client', 'network_context') + + @staticmethod + def wrap_factory(http_client_factory: HTTPCLIENTFACTORY, network_context: NetworkContext): + """Return a factory which wraps the result of http_client_factory with _TimeHTTPClientWrapper instance.""" + functools.wraps(http_client_factory) + + def wrapped_factory(): + return _TimeHTTPClientWrapper(http_client_factory(), network_context) + + wrapped_factory.__wrapped__ = http_client_factory + return wrapped_factory + + def __init__(self, http_client: ABCHTTPClient, network_context: NetworkContext) -> None: + self.http_client = http_client + self.network_context = network_context + + def send(self, stream, method, url, **kwargs) -> httpx.Response: + """Send the HTTP request using self.http_client + + Inaccurate with stream: the method must record HTTP time with the close method of httpx.Response. + It is not a problem since stream are used only for the image proxy. + """ + with self.network_context._record_http_time(): # pylint: disable=protected-access + timeout = self._extract_timeout(kwargs) + return self.http_client.send(stream, method, url, timeout=timeout, **kwargs) + + def close(self): + return self.http_client.close() + + @property + def is_closed(self) -> bool: + return self.http_client.is_closed + + def _extract_timeout(self, kwargs): + """Extract the timeout parameter and adjust it according to the remaining time""" + timeout = kwargs.pop('timeout', None) + return self.network_context.get_remaining_time(timeout) + + +## NetworkContextRetryFunction + + +class NetworkContextRetryFunction(NetworkContext): + """When an HTTP request fails, this NetworkContext tries again + the whole function with another HTTP client. + + This guarantees that func has the same HTTP client all along. + """ + + def call(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R: + try: + while self._retries >= 0 and self.get_remaining_time() > 0: + self._set_http_client() + try: + return func(*args, **kwargs) # type: ignore + except (ssl.SSLError, httpx.RequestError, httpx.HTTPStatusError, SoftRetryHTTPException) as e: + if self._retries <= 0: + # raise the exception only there is no more try + raise e + self._retries -= 1 + raise httpx.TimeoutException("Timeout") + finally: + self._reset_http_client() + + def _get_new_client_from_factory(self): + return _RetryFunctionHTTPClient(super()._get_new_client_from_factory(), self) + + +class _RetryFunctionHTTPClient(ABCHTTPClient): + """Companion class of NetworkContextRetryFunction + + Do one thing: if the retries count of the NetworkContext is zero and there is a SoftRetryHTTPException, + then the send method catch this exception and returns the HTTP response. + This make sure the SoftRetryHTTPException exception is not seen outside the searx.network module. + """ + + def __init__(self, http_client: ABCHTTPClient, network_context: NetworkContextRetryFunction): + self.http_client = http_client + self.network_context = network_context + + def send(self, stream: bool, method: str, url: str, **kwargs) -> httpx.Response: + try: + return self.http_client.send(stream, method, url, **kwargs) + except SoftRetryHTTPException as e: + if self.network_context._retries <= 0: # pylint: disable=protected-access + return e.response + raise e + + def close(self): + return self.http_client.close() + + @property + def is_closed(self) -> bool: + return self.http_client.is_closed + + +## NetworkContextRetrySameHTTPClient + + +class NetworkContextRetrySameHTTPClient(NetworkContext): + """When an HTTP request fails, this NetworkContext tries again + the same HTTP request with the same HTTP client + + The implementation wraps the provided ABCHTTPClient with _RetrySameHTTPClient.""" + + def call(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R: + try: + self._set_http_client() + return func(*args, **kwargs) # type: ignore + finally: + self._reset_http_client() + + def _get_new_client_from_factory(self): + return _RetrySameHTTPClient(super()._get_new_client_from_factory(), self) + + +class _RetrySameHTTPClient(ABCHTTPClient): + """Companion class of NetworkContextRetrySameHTTPClient""" + + def __init__(self, http_client: ABCHTTPClient, network_content: NetworkContextRetrySameHTTPClient): + self.http_client = http_client + self.network_content = network_content + + def send(self, stream: bool, method: str, url: str, **kwargs) -> httpx.Response: + retries = self.network_content._retries # pylint: disable=protected-access + while retries >= 0 and self.network_content.get_remaining_time() > 0: + try: + return self.http_client.send(stream, method, url, **kwargs) + except SoftRetryHTTPException as e: + if retries <= 0: + return e.response + except (ssl.SSLError, httpx.RequestError, httpx.HTTPStatusError) as e: + if retries <= 0: + raise e + retries -= 1 + return self.http_client.send(stream, method, url, **kwargs) + + def close(self): + return self.http_client.close() + + @property + def is_closed(self) -> bool: + return self.http_client.is_closed + + +## NetworkContextRetryDifferentHTTPClient + + +class NetworkContextRetryDifferentHTTPClient(NetworkContext): + """When a HTTP request fails, this NetworkContext tries again + the same HTTP request with a different HTTP client + + The implementation wraps the provided ABCHTTPClient with _RetryDifferentHTTPClient.""" + + def call(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R: + self._set_http_client() + try: + return func(*args, **kwargs) # type: ignore + finally: + self._reset_http_client() + + def _get_new_client_from_factory(self): + return _RetryDifferentHTTPClient(self) + + +class _RetryDifferentHTTPClient(ABCHTTPClient): + """Companion class of NetworkContextRetryDifferentHTTPClient""" + + def __init__(self, network_context: NetworkContextRetryDifferentHTTPClient) -> None: + self.network_context = network_context + + def send(self, stream: bool, method: str, url: str, **kwargs) -> httpx.Response: + retries = self.network_context._retries # pylint: disable=protected-access + while retries >= 0 and self.network_context.get_remaining_time() > 0: + http_client = self.network_context._http_client_factory() # pylint: disable=protected-access + try: + return http_client.send(stream, method, url, **kwargs) + except SoftRetryHTTPException as e: + if retries <= 0: + return e.response + except (ssl.SSLError, httpx.RequestError, httpx.HTTPStatusError) as e: + if retries <= 0: + raise e + retries -= 1 + http_client = self.network_context._http_client_factory() # pylint: disable=protected-access + return http_client.send(stream, method, url, **kwargs) + + def close(self): + raise NotImplementedError() + + @property + def is_closed(self) -> bool: + raise NotImplementedError() diff --git a/searx/network/network.py b/searx/network/network.py index 3ab0bff3a..0f7f4ac8d 100644 --- a/searx/network/network.py +++ b/searx/network/network.py @@ -1,156 +1,227 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # lint: pylint -# pylint: disable=global-statement -# pylint: disable=missing-module-docstring, missing-class-docstring +# pylint: disable=missing-class-docstring +# pyright: basic +"""Deal with + +* create Networks from settings.yml +* each Network contains an ABCHTTPClient for each (proxies, IP addresses). Lazy initialized. +* a Network provides two methods: + + * get_http_client: returns an HTTP client. Prefer the get_context, + retry strategy is ignored with get_http_client + * get_context: provides a runtime context for the engine, see searx.network.context +""" -import atexit -import asyncio import ipaddress +from dataclasses import dataclass, field +from enum import Enum from itertools import cycle -from typing import Dict +from typing import Any, Dict, List, Mapping, Optional, Tuple, Union import httpx from searx import logger, searx_debug -from .client import new_client, get_loop, AsyncHTTPTransportNoHttp -from .raise_for_httperror import raise_for_httperror - +from searx.network.client import HTTPClient, TorHTTPClient +from searx.network.context import ( + NetworkContext, + NetworkContextRetryDifferentHTTPClient, + NetworkContextRetryFunction, + NetworkContextRetrySameHTTPClient, +) logger = logger.getChild('network') -DEFAULT_NAME = '__DEFAULT__' -NETWORKS: Dict[str, 'Network'] = {} -# requests compatibility when reading proxy settings from settings.yml -PROXY_PATTERN_MAPPING = { - 'http': 'http://', - 'https': 'https://', - 'socks4': 'socks4://', - 'socks5': 'socks5://', - 'socks5h': 'socks5h://', - 'http:': 'http://', - 'https:': 'https://', - 'socks4:': 'socks4://', - 'socks5:': 'socks5://', - 'socks5h:': 'socks5h://', -} -ADDRESS_MAPPING = {'ipv4': '0.0.0.0', 'ipv6': '::'} + +class RetryStrategy(Enum): + ENGINE = NetworkContextRetryFunction + SAME_HTTP_CLIENT = NetworkContextRetrySameHTTPClient + DIFFERENT_HTTP_CLIENT = NetworkContextRetryDifferentHTTPClient + + +TYPE_IP_ANY = Union[ # pylint: disable=invalid-name + ipaddress.IPv4Address, + ipaddress.IPv6Address, + ipaddress.IPv4Network, + ipaddress.IPv6Network, +] + +TYPE_RETRY_ON_ERROR = Union[List[int], int, bool] # pylint: disable=invalid-name + + +@dataclass(order=True, frozen=True) +class NetworkSettings: + """Configuration for a Network. See NetworkSettingsReader""" + + # Individual HTTP requests can override these parameters. + verify: bool = True + max_redirects: int = 30 + # These parameters can not be overridden. + enable_http: bool = False + enable_http2: bool = True + max_connections: Optional[int] = 10 + max_keepalive_connections: Optional[int] = 100 + keepalive_expiry: Optional[float] = 5.0 + local_addresses: List[TYPE_IP_ANY] = field(default_factory=list) + proxies: Dict[str, List[str]] = field(default_factory=dict) + using_tor_proxy: bool = False + retries: int = 0 + retry_strategy: RetryStrategy = RetryStrategy.DIFFERENT_HTTP_CLIENT + retry_on_http_error: Optional[TYPE_RETRY_ON_ERROR] = None + logger_name: Optional[str] = None class Network: + """Provides NetworkContext and ABCHTTPClient following NetworkSettings. + + A Network might have multiple IP addresses and proxies; + in this case, each call to get_context or get_http_client provides a different + configuration. + """ __slots__ = ( - 'enable_http', - 'verify', - 'enable_http2', - 'max_connections', - 'max_keepalive_connections', - 'keepalive_expiry', - 'local_addresses', - 'proxies', - 'using_tor_proxy', - 'max_redirects', - 'retries', - 'retry_on_http_error', + '_settings', '_local_addresses_cycle', '_proxies_cycle', '_clients', '_logger', ) - _TOR_CHECK_RESULT = {} + def __init__(self, settings: NetworkSettings): + """Creates a Network from a NetworkSettings""" + self._settings = settings + self._local_addresses_cycle = self._get_local_addresses_cycle() + self._proxies_cycle = self._get_proxy_cycles() + self._clients: Dict[Tuple, HTTPClient] = {} + self._logger = logger.getChild(settings.logger_name) if settings.logger_name else logger - def __init__( - # pylint: disable=too-many-arguments - self, - enable_http=True, - verify=True, - enable_http2=False, - max_connections=None, - max_keepalive_connections=None, - keepalive_expiry=None, - proxies=None, - using_tor_proxy=False, - local_addresses=None, - retries=0, - retry_on_http_error=None, - max_redirects=30, - logger_name=None, - ): + @staticmethod + def from_dict(**kwargs): + """Creates a Network from a keys/values""" + return Network(NetwortSettingsDecoder.from_dict(kwargs)) - self.enable_http = enable_http - self.verify = verify - self.enable_http2 = enable_http2 - self.max_connections = max_connections - self.max_keepalive_connections = max_keepalive_connections - self.keepalive_expiry = keepalive_expiry - self.proxies = proxies - self.using_tor_proxy = using_tor_proxy - self.local_addresses = local_addresses - self.retries = retries - self.retry_on_http_error = retry_on_http_error - self.max_redirects = max_redirects - self._local_addresses_cycle = self.get_ipaddress_cycle() - self._proxies_cycle = self.get_proxy_cycles() - self._clients = {} - self._logger = logger.getChild(logger_name) if logger_name else logger - self.check_parameters() + def close(self): + """Close all the ABCHTTPClient hold by the Network""" + for client in self._clients.values(): + client.close() - def check_parameters(self): - for address in self.iter_ipaddresses(): - if '/' in address: - ipaddress.ip_network(address, False) - else: - ipaddress.ip_address(address) + def check_configuration(self) -> bool: + """Check if the network configuration is valid. - if self.proxies is not None and not isinstance(self.proxies, (str, dict)): - raise ValueError('proxies type has to be str, dict or None') + Typical use case: check if the proxy is really a Tor proxy""" + try: + self._get_http_client() + return True + except Exception: # pylint: disable=broad-except + self._logger.exception('Error') + return False - def iter_ipaddresses(self): - local_addresses = self.local_addresses - if not local_addresses: - return - if isinstance(local_addresses, str): - local_addresses = [local_addresses] - for address in local_addresses: - yield address + def get_context(self, timeout: Optional[float] = None, start_time: Optional[float] = None) -> NetworkContext: + """Return a new NetworkContext""" + context_cls = self._settings.retry_strategy.value + return context_cls(self._settings.retries, self._get_http_client, start_time, timeout) - def get_ipaddress_cycle(self): + def _get_http_client(self) -> HTTPClient: + """Return an HTTP client. + + Different HTTP clients are returned according to the configuration. + + For example, if two proxies are defined, + the first call to this function returns an HTTP client using the first proxy. + A second call returns an HTTP client using the second proxy. + A third call returns the same HTTP client from the first call, using the first proxy. + """ + local_addresses = next(self._local_addresses_cycle) + proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key + key = (local_addresses, proxies) + if key not in self._clients or self._clients[key].is_closed: + http_client_cls = TorHTTPClient if self._settings.using_tor_proxy else HTTPClient + hook_log_response = self._log_response if searx_debug else None + log_trace = self._log_trace if searx_debug else None + self._clients[key] = http_client_cls( + verify=self._settings.verify, + enable_http=self._settings.enable_http, + enable_http2=self._settings.enable_http2, + max_connections=self._settings.max_connections, + max_keepalive_connections=self._settings.max_keepalive_connections, + keepalive_expiry=self._settings.keepalive_expiry, + proxies=dict(proxies), + local_addresses=local_addresses, + retry_on_http_error=self._settings.retry_on_http_error, + hook_log_response=hook_log_response, + log_trace=log_trace, + logger=self._logger, + ) + return self._clients[key] + + def _get_local_addresses_cycle(self): + """Never-ending generator of IP addresses""" while True: - count = 0 - for address in self.iter_ipaddresses(): - if '/' in address: - for a in ipaddress.ip_network(address, False).hosts(): + at_least_one = False + for address in self._settings.local_addresses: + if isinstance(address, (ipaddress.IPv4Network, ipaddress.IPv6Network)): + for a in address.hosts(): yield str(a) - count += 1 + at_least_one = True else: - a = ipaddress.ip_address(address) - yield str(a) - count += 1 - if count == 0: + yield str(address) + at_least_one = True + if not at_least_one: + # IPv4Network.hosts() and IPv6Network.hosts() might never return an IP address. + # at_least_one makes sure the generator does not turn into infinite loop without yield yield None - def iter_proxies(self): - if not self.proxies: - return - # https://www.python-httpx.org/compatibility/#proxy-keys - if isinstance(self.proxies, str): - yield 'all://', [self.proxies] - else: - for pattern, proxy_url in self.proxies.items(): - pattern = PROXY_PATTERN_MAPPING.get(pattern, pattern) - if isinstance(proxy_url, str): - proxy_url = [proxy_url] - yield pattern, proxy_url + def _get_proxy_cycles(self): + """Never-ending generator of proxy configurations. - def get_proxy_cycles(self): - proxy_settings = {} - for pattern, proxy_urls in self.iter_proxies(): - proxy_settings[pattern] = cycle(proxy_urls) + Each iteration returns tuples of tuples. + Semantically, this is a dictionary where + * keys are the mount points (see https://www.python-httpx.org/advanced/#mounting-transports ) + * values are the proxy URLs. + + This private method returns a tuple instead of a dictionary to be hashable. + See the line `key = (local_addresses, proxies)` above. + + For example, if settings.yml contains: + ```yaml + proxies: socks5h://localhost:1337 + ``` + + This is equivalent to + ```yaml + proxies: + - all://: socks5h://localhost:1337 + ``` + + And this method always returns: + * `(('all://', 'socks5h://localhost:1337'))` + + Another example: + + ```yaml + proxies: + - all://: socks5h://localhost:1337 + - https://bing.com: + - socks5h://localhost:4000 + - socks5h://localhost:5000 + ``` + + In this example, this method alternately returns these two responses: + + * `(('all://', 'socks5h://localhost:1337'), ('https://bing.com', 'socks5h://localhost:4000'))` + * `(('all://', 'socks5h://localhost:1337'), ('https://bing.com', 'socks5h://localhost:5000'))` + + When no proxies are configured, this method returns an empty tuple at each iteration. + """ + # for each pattern, turn each list of proxy into a cycle + proxy_settings = {pattern: cycle(proxy_urls) for pattern, proxy_urls in (self._settings.proxies).items()} while True: # pylint: disable=stop-iteration-return + # ^^ is it a pylint bug ? yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items()) - async def log_response(self, response: httpx.Response): + def _log_response(self, response: httpx.Response): + """Logs from httpx are disabled. Log the HTTP response with the logger from the network""" request = response.request status = f"{response.status_code} {response.reason_phrase}" response_line = f"{response.http_version} {status}" @@ -158,270 +229,273 @@ class Network: content_type = f' ({content_type})' if content_type else '' self._logger.debug(f'HTTP Request: {request.method} {request.url} "{response_line}"{content_type}') - @staticmethod - async def check_tor_proxy(client: httpx.AsyncClient, proxies) -> bool: - if proxies in Network._TOR_CHECK_RESULT: - return Network._TOR_CHECK_RESULT[proxies] + def _log_trace(self, name: str, info: Mapping[str, Any]) -> None: + """Log the actual source / dest IPs and SSL cipher. - result = True - # ignore client._transport because it is not used with all:// - for transport in client._mounts.values(): # pylint: disable=protected-access - if isinstance(transport, AsyncHTTPTransportNoHttp): - continue - if getattr(transport, "_pool") and getattr( - transport._pool, "_rdns", False # pylint: disable=protected-access - ): - continue - return False - response = await client.get("https://check.torproject.org/api/ip", timeout=60) - if not response.json()["IsTor"]: - result = False - Network._TOR_CHECK_RESULT[proxies] = result - return result + Note: does not work with socks proxy - async def get_client(self, verify=None, max_redirects=None): - verify = self.verify if verify is None else verify - max_redirects = self.max_redirects if max_redirects is None else max_redirects - local_address = next(self._local_addresses_cycle) - proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key - key = (verify, max_redirects, local_address, proxies) - hook_log_response = self.log_response if searx_debug else None - if key not in self._clients or self._clients[key].is_closed: - client = new_client( - self.enable_http, - verify, - self.enable_http2, - self.max_connections, - self.max_keepalive_connections, - self.keepalive_expiry, - dict(proxies), - local_address, - 0, - max_redirects, - hook_log_response, - ) - if self.using_tor_proxy and not await self.check_tor_proxy(client, proxies): - await client.aclose() - raise httpx.ProxyError('Network configuration problem: not using Tor') - self._clients[key] = client - return self._clients[key] + See + * https://www.encode.io/httpcore/extensions/ + * https://github.com/encode/httpx/blob/e874351f04471029b2c5dcb2d0b50baccc7b9bc0/httpx/_main.py#L207 + """ + if name == "connection.connect_tcp.complete": + stream = info["return_value"] + server_addr = stream.get_extra_info("server_addr") + client_addr = stream.get_extra_info("client_addr") + self._logger.debug(f"* Connected from {client_addr[0]!r} to {server_addr[0]!r} on port {server_addr[1]}") + elif name == "connection.start_tls.complete": # pragma: no cover + stream = info["return_value"] + ssl_object = stream.get_extra_info("ssl_object") + version = ssl_object.version() + cipher = ssl_object.cipher() + alpn = ssl_object.selected_alpn_protocol() + self._logger.debug(f"* SSL established using {version!r} / {cipher[0]!r}, ALPN protocol: {alpn!r}") + elif name == "http2.send_request_headers.started": + self._logger.debug(f"* HTTP/2 stream_id: {info['stream_id']}") - async def aclose(self): - async def close_client(client): - try: - await client.aclose() - except httpx.HTTPError: - pass - - await asyncio.gather(*[close_client(client) for client in self._clients.values()], return_exceptions=False) - - @staticmethod - def extract_kwargs_clients(kwargs): - kwargs_clients = {} - if 'verify' in kwargs: - kwargs_clients['verify'] = kwargs.pop('verify') - if 'max_redirects' in kwargs: - kwargs_clients['max_redirects'] = kwargs.pop('max_redirects') - if 'allow_redirects' in kwargs: - # see https://github.com/encode/httpx/pull/1808 - kwargs['follow_redirects'] = kwargs.pop('allow_redirects') - return kwargs_clients - - @staticmethod - def extract_do_raise_for_httperror(kwargs): - do_raise_for_httperror = True - if 'raise_for_httperror' in kwargs: - do_raise_for_httperror = kwargs['raise_for_httperror'] - del kwargs['raise_for_httperror'] - return do_raise_for_httperror - - @staticmethod - def patch_response(response, do_raise_for_httperror): - if isinstance(response, httpx.Response): - # requests compatibility (response is not streamed) - # see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses - response.ok = not response.is_error - - # raise an exception - if do_raise_for_httperror: - raise_for_httperror(response) - - return response - - def is_valid_response(self, response): - # pylint: disable=too-many-boolean-expressions - if ( - (self.retry_on_http_error is True and 400 <= response.status_code <= 599) - or (isinstance(self.retry_on_http_error, list) and response.status_code in self.retry_on_http_error) - or (isinstance(self.retry_on_http_error, int) and response.status_code == self.retry_on_http_error) - ): - return False - return True - - async def call_client(self, stream, method, url, **kwargs): - retries = self.retries - was_disconnected = False - do_raise_for_httperror = Network.extract_do_raise_for_httperror(kwargs) - kwargs_clients = Network.extract_kwargs_clients(kwargs) - while retries >= 0: # pragma: no cover - client = await self.get_client(**kwargs_clients) - try: - if stream: - response = client.stream(method, url, **kwargs) - else: - response = await client.request(method, url, **kwargs) - if self.is_valid_response(response) or retries <= 0: - return Network.patch_response(response, do_raise_for_httperror) - except httpx.RemoteProtocolError as e: - if not was_disconnected: - # the server has closed the connection: - # try again without decreasing the retries variable & with a new HTTP client - was_disconnected = True - await client.aclose() - self._logger.warning('httpx.RemoteProtocolError: the server has disconnected, retrying') - continue - if retries <= 0: - raise e - except (httpx.RequestError, httpx.HTTPStatusError) as e: - if retries <= 0: - raise e - retries -= 1 - - async def request(self, method, url, **kwargs): - return await self.call_client(False, method, url, **kwargs) - - async def stream(self, method, url, **kwargs): - return await self.call_client(True, method, url, **kwargs) - - @classmethod - async def aclose_all(cls): - await asyncio.gather(*[network.aclose() for network in NETWORKS.values()], return_exceptions=False) + def __repr__(self): + return f"<{self.__class__.__name__} logger_name={self._settings.logger_name!r}>" -def get_network(name=None): - return NETWORKS.get(name or DEFAULT_NAME) +class NetwortSettingsDecoder: + """Convert a description of a network in settings.yml to a NetworkSettings instance""" - -def check_network_configuration(): - async def check(): - exception_count = 0 - for network in NETWORKS.values(): - if network.using_tor_proxy: - try: - await network.get_client() - except Exception: # pylint: disable=broad-except - network._logger.exception('Error') # pylint: disable=protected-access - exception_count += 1 - return exception_count - - future = asyncio.run_coroutine_threadsafe(check(), get_loop()) - exception_count = future.result() - if exception_count > 0: - raise RuntimeError("Invalid network configuration") - - -def initialize(settings_engines=None, settings_outgoing=None): - # pylint: disable=import-outside-toplevel) - from searx.engines import engines - from searx import settings - - # pylint: enable=import-outside-toplevel) - - settings_engines = settings_engines or settings['engines'] - settings_outgoing = settings_outgoing or settings['outgoing'] - - # default parameters for AsyncHTTPTransport - # see https://github.com/encode/httpx/blob/e05a5372eb6172287458b37447c30f650047e1b8/httpx/_transports/default.py#L108-L121 # pylint: disable=line-too-long - default_params = { - 'enable_http': False, - 'verify': settings_outgoing['verify'], - 'enable_http2': settings_outgoing['enable_http2'], - 'max_connections': settings_outgoing['pool_connections'], - 'max_keepalive_connections': settings_outgoing['pool_maxsize'], - 'keepalive_expiry': settings_outgoing['keepalive_expiry'], - 'local_addresses': settings_outgoing['source_ips'], - 'using_tor_proxy': settings_outgoing['using_tor_proxy'], - 'proxies': settings_outgoing['proxies'], - 'max_redirects': settings_outgoing['max_redirects'], - 'retries': settings_outgoing['retries'], - 'retry_on_http_error': None, + # requests compatibility when reading proxy settings from settings.yml + PROXY_PATTERN_MAPPING = { + 'http': 'http://', + 'https': 'https://', + 'socks4': 'socks4://', + 'socks5': 'socks5://', + 'socks5h': 'socks5h://', + 'http:': 'http://', + 'https:': 'https://', + 'socks4:': 'socks4://', + 'socks5:': 'socks5://', + 'socks5h:': 'socks5h://', } - def new_network(params, logger_name=None): - nonlocal default_params - result = {} - result.update(default_params) - result.update(params) - if logger_name: - result['logger_name'] = logger_name - return Network(**result) + @classmethod + def from_dict(cls, network_settings: Dict[str, Any]) -> NetworkSettings: + # Decode the parameters that require it; the other parameters are left as they are + decoders = { + "proxies": cls._decode_proxies, + "local_addresses": cls._decode_local_addresses, + "retry_strategy": cls._decode_retry_strategy, + } + for key, decode_func in decoders.items(): + if key not in network_settings: + continue + if network_settings[key] is None: + # None is seen as not set: rely on the default values from NetworkSettings + del network_settings[key] + else: + network_settings[key] = decode_func(network_settings[key]) + # Relies on the default values of NetworkSettings for unset parameters + return NetworkSettings(**network_settings) - def iter_networks(): - nonlocal settings_engines + @classmethod + def _decode_proxies(cls, proxies) -> Dict[str, List[str]]: + if isinstance(proxies, str): + # for example: + # proxies: socks5://localhost:8000 + proxies = {'all://': [proxies]} + elif isinstance(proxies, list): + # for example: + # proxies: + # - socks5h://localhost:8000 + # - socks5h://localhost:8001 + proxies = {'all://': proxies} + + if not isinstance(proxies, dict): + raise ValueError('proxies type has to be str, list, dict or None') + + # Here we are sure to have + # proxies = { + # pattern: a_value + # } + # with a_value that can be either a string or a list. + # Now, we make sure that a_value is always a list of strings. + # Also, we keep compatibility with requests regarding the patterns: + # see https://www.python-httpx.org/compatibility/#proxy-keys + result = {} + for pattern, proxy_list in proxies.items(): + pattern = cls.PROXY_PATTERN_MAPPING.get(pattern, pattern) + if isinstance(proxy_list, str): + proxy_list = [proxy_list] + if not isinstance(proxy_list, list): + raise ValueError('proxy list') + for proxy in proxy_list: + if not isinstance(proxy, str): + raise ValueError(f'{repr(proxy)} : an URL is expected') + result[pattern] = proxy_list + return result + + @staticmethod + def _decode_local_addresses(ip_addresses: Union[str, List[str]]) -> List[TYPE_IP_ANY]: + if isinstance(ip_addresses, str): + ip_addresses = [ip_addresses] + + if not isinstance(ip_addresses, list): + raise ValueError('IP address must be either None or a string or a list of strings') + + # check IP address syntax + result = [] + for address in ip_addresses: + if not isinstance(address, str): + raise ValueError(f'An {address!r} must be an IP address written as a string') + if '/' in address: + result.append(ipaddress.ip_network(address, False)) + else: + result.append(ipaddress.ip_address(address)) + return result + + @staticmethod + def _decode_retry_strategy(retry_strategy: str) -> RetryStrategy: + for member in RetryStrategy: + if member.name.lower() == retry_strategy.lower(): + return member + raise ValueError(f"{retry_strategy} is not a RetryStrategy") + + +class NetworkManager: + """Contains all the Network instances. + + By default, there is one default network, so searx.network. + """ + + DEFAULT_NAME = '__DEFAULT__' + + def __init__(self): + # Create a default network so scripts in searxng_extra don't have load settings.yml + self.networks: Dict[str, Network] = {NetworkManager.DEFAULT_NAME: Network.from_dict()} + + def get(self, name: Optional[str] = None): + return self.networks[name or NetworkManager.DEFAULT_NAME] + + def initialize_from_settings(self, settings_engines, settings_outgoing, check=True): + # pylint: disable=too-many-branches + from searx.engines import engines # pylint: disable=import-outside-toplevel + + # Default parameters for HTTPTransport + # see https://github.com/encode/httpx/blob/e05a5372eb6172287458b37447c30f650047e1b8/httpx/_transports/default.py#L108-L121 # pylint: disable=line-too-long + default_network_settings = { + 'verify': settings_outgoing['verify'], + 'enable_http': settings_outgoing['enable_http'], + 'enable_http2': settings_outgoing['enable_http2'], + 'max_connections': settings_outgoing['pool_connections'], # different because of historical reason + 'max_keepalive_connections': settings_outgoing['pool_maxsize'], # different because of historical reason + 'keepalive_expiry': settings_outgoing['keepalive_expiry'], + 'max_redirects': settings_outgoing['max_redirects'], + 'retries': settings_outgoing['retries'], + 'proxies': settings_outgoing['proxies'], + 'local_addresses': settings_outgoing['source_ips'], # different because of historical reason + 'using_tor_proxy': settings_outgoing['using_tor_proxy'], + 'retry_on_http_error': None, + } + + def new_network(network_settings: Dict[str, Any], logger_name: Optional[str] = None): + nonlocal default_network_settings + result = {} + result.update(default_network_settings) + result.update(network_settings) + if logger_name: + result['logger_name'] = logger_name + return Network.from_dict(**result) + + # ipv4 and ipv6 are always defined + self.networks = { + NetworkManager.DEFAULT_NAME: new_network({}, logger_name='default'), + 'ipv4': new_network({'local_addresses': '0.0.0.0'}, logger_name='ipv4'), + 'ipv6': new_network({'local_addresses': '::'}, logger_name='ipv6'), + } + + # define networks from outgoing.networks. Example of configuration: + # + # outgoing: + # networks: + # my_proxy: + # proxies: http://localhost:1337 + # + for network_name, network_dict in settings_outgoing['networks'].items(): + self.networks[network_name] = new_network(network_dict, logger_name=network_name) + + # Get the engine network settings directly from the engine modules and settings.yml (not as NetworkSettings) + engine_network_dict_settings = {} for engine_spec in settings_engines: engine_name = engine_spec['name'] engine = engines.get(engine_name) if engine is None: continue + engine_network_dict_settings[engine_name] = self._get_engine_network_settings( + engine_name, engine, default_network_settings + ) + + # Define networks from engines.[i].network (except references) + for engine_name, network_dict in engine_network_dict_settings.items(): + if isinstance(network_dict, dict): + self.networks[engine_name] = new_network(network_dict, logger_name=engine_name) + + # Define networks from engines.[i].network (only references) + for engine_name, network_dict in engine_network_dict_settings.items(): + if isinstance(network_dict, str): + self.networks[engine_name] = self.networks[network_dict] + + # The /image_proxy endpoint has a dedicated network using the same parameters + # as the default network, but HTTP/2 is disabled. It decreases the CPU load average, + # and the total time is more or less the same. + if 'image_proxy' not in self.networks: + image_proxy_params = default_network_settings.copy() + image_proxy_params['enable_http2'] = False + self.networks['image_proxy'] = new_network(image_proxy_params, logger_name='image_proxy') + + # Define a network the autocompletion + if 'autocomplete' not in self.networks: + self.networks['autocomplete'] = new_network(default_network_settings, logger_name='autocomplete') + + # Check if each network is valid: + # * one HTTP client is instantiated + # --> Tor connectivity is checked if using_tor_proxy is True + if check: + exception_count = 0 + for network in self.networks.values(): + if not network.check_configuration(): + exception_count += 1 + if exception_count > 0: + raise RuntimeError("Invalid network configuration") + + @staticmethod + def _get_engine_network_settings(engine_name, engine, default_network_settings): + if hasattr(engine, 'network'): + # The network configuration is defined in settings.yml inside a network key. + # For example: + # + # - name: arxiv + # engine: arxiv + # shortcut: arx + # network: + # http2: false + # proxies: socks5h://localhost:1337 + # network = getattr(engine, 'network', None) - yield engine_name, engine, network - - if NETWORKS: - done() - NETWORKS.clear() - NETWORKS[DEFAULT_NAME] = new_network({}, logger_name='default') - NETWORKS['ipv4'] = new_network({'local_addresses': '0.0.0.0'}, logger_name='ipv4') - NETWORKS['ipv6'] = new_network({'local_addresses': '::'}, logger_name='ipv6') - - # define networks from outgoing.networks - for network_name, network in settings_outgoing['networks'].items(): - NETWORKS[network_name] = new_network(network, logger_name=network_name) - - # define networks from engines.[i].network (except references) - for engine_name, engine, network in iter_networks(): - if network is None: - network = {} - for attribute_name, attribute_value in default_params.items(): - if hasattr(engine, attribute_name): - network[attribute_name] = getattr(engine, attribute_name) - else: - network[attribute_name] = attribute_value - NETWORKS[engine_name] = new_network(network, logger_name=engine_name) - elif isinstance(network, dict): - NETWORKS[engine_name] = new_network(network, logger_name=engine_name) - - # define networks from engines.[i].network (references) - for engine_name, engine, network in iter_networks(): - if isinstance(network, str): - NETWORKS[engine_name] = NETWORKS[network] - - # the /image_proxy endpoint has a dedicated network. - # same parameters than the default network, but HTTP/2 is disabled. - # It decreases the CPU load average, and the total time is more or less the same - if 'image_proxy' not in NETWORKS: - image_proxy_params = default_params.copy() - image_proxy_params['enable_http2'] = False - NETWORKS['image_proxy'] = new_network(image_proxy_params, logger_name='image_proxy') + if not isinstance(network, (dict, str)): + raise ValueError(f'Engine {engine_name}: network must be a dictionnary or string') + return network + # The network settings are mixed with the other engine settings. + # The code checks if the keys from default_network_settings are defined in the engine module + # + # For example: + # + # - name: arxiv + # engine: arxiv + # shortcut: arx + # http2: false + # proxies: socks5h://localhost:1337 + # + return { + attribute_name: getattr(engine, attribute_name) + for attribute_name in default_network_settings.keys() + if hasattr(engine, attribute_name) + } -@atexit.register -def done(): - """Close all HTTP client - - Avoid a warning at exit - See https://github.com/encode/httpx/pull/2026 - - Note: since Network.aclose has to be async, it is not possible to call this method on Network.__del__ - So Network.aclose is called here using atexit.register - """ - try: - loop = get_loop() - if loop: - future = asyncio.run_coroutine_threadsafe(Network.aclose_all(), loop) - # wait 3 seconds to close the HTTP clients - future.result(3) - finally: - NETWORKS.clear() - - -NETWORKS[DEFAULT_NAME] = Network() +NETWORKS = NetworkManager() diff --git a/searx/network/raise_for_httperror.py b/searx/network/raise_for_httperror.py index 9f847d436..8900afc67 100644 --- a/searx/network/raise_for_httperror.py +++ b/searx/network/raise_for_httperror.py @@ -4,12 +4,12 @@ """ +from searx import get_setting from searx.exceptions import ( + SearxEngineAccessDeniedException, SearxEngineCaptchaException, SearxEngineTooManyRequestsException, - SearxEngineAccessDeniedException, ) -from searx import get_setting def is_cloudflare_challenge(resp): diff --git a/searx/search/__init__.py b/searx/search/__init__.py index fcdc1f72f..7092d35e8 100644 --- a/searx/search/__init__.py +++ b/searx/search/__init__.py @@ -19,7 +19,7 @@ from searx import logger from searx.plugins import plugins from searx.search.models import EngineRef, SearchQuery from searx.engines import load_engines -from searx.network import initialize as initialize_network, check_network_configuration +from searx.network import NETWORKS from searx.metrics import initialize as initialize_metrics, counter_inc, histogram_observe_time from searx.search.processors import PROCESSORS, initialize as initialize_processors from searx.search.checker import initialize as initialize_checker @@ -31,9 +31,7 @@ logger = logger.getChild('search') def initialize(settings_engines=None, enable_checker=False, check_network=False, enable_metrics=True): settings_engines = settings_engines or settings['engines'] load_engines(settings_engines) - initialize_network(settings_engines, settings['outgoing']) - if check_network: - check_network_configuration() + NETWORKS.initialize_from_settings(settings_engines, settings['outgoing'], check=check_network) initialize_metrics([engine['name'] for engine in settings_engines], enable_metrics) initialize_processors(settings_engines) if enable_checker: diff --git a/searx/search/checker/impl.py b/searx/search/checker/impl.py index 34fd9afd8..5e039c573 100644 --- a/searx/search/checker/impl.py +++ b/searx/search/checker/impl.py @@ -12,7 +12,8 @@ from urllib.parse import urlparse import re import httpx -from searx import network, logger +from searx import logger +from searx.network import NETWORKS from searx.utils import gen_useragent, detect_language from searx.results import ResultContainer from searx.search.models import SearchQuery, EngineRef @@ -72,8 +73,8 @@ def _download_and_check_if_image(image_url: str) -> bool: a = time() try: # use "image_proxy" (avoid HTTP/2) - network.set_context_network_name('image_proxy') - r, stream = network.stream( + network_context = NETWORKS.get('image_proxy').get_context() + r = network_context.stream( 'GET', image_url, timeout=10.0, @@ -96,7 +97,6 @@ def _download_and_check_if_image(image_url: str) -> bool: else: is_image = False del r - del stream return is_image except httpx.TimeoutException: logger.error('Timeout for %s: %i', image_url, int(time() - a)) diff --git a/searx/search/processors/abstract.py b/searx/search/processors/abstract.py index 0cabec97a..80087c6ab 100644 --- a/searx/search/processors/abstract.py +++ b/searx/search/processors/abstract.py @@ -12,7 +12,7 @@ from typing import Dict, Union from searx import settings, logger from searx.engines import engines -from searx.network import get_time_for_thread, get_network +from searx.network import NETWORKS from searx.metrics import histogram_observe, counter_inc, count_exception, count_error from searx.exceptions import SearxEngineAccessDeniedException, SearxEngineResponseException from searx.utils import get_engine_from_settings @@ -66,7 +66,7 @@ class EngineProcessor(ABC): self.engine = engine self.engine_name = engine_name self.logger = engines[engine_name].logger - key = get_network(self.engine_name) + key = NETWORKS.get(self.engine_name) key = id(key) if key else self.engine_name self.suspended_status = SUSPENDED_STATUS.setdefault(key, SuspendedStatus()) @@ -107,26 +107,25 @@ class EngineProcessor(ABC): suspended_time = exception_or_message.suspended_time self.suspended_status.suspend(suspended_time, error_message) # pylint: disable=no-member - def _extend_container_basic(self, result_container, start_time, search_results): + def _extend_container_basic(self, result_container, start_time, search_results, network_time=None): # update result_container result_container.extend(self.engine_name, search_results) engine_time = default_timer() - start_time - page_load_time = get_time_for_thread() - result_container.add_timing(self.engine_name, engine_time, page_load_time) + result_container.add_timing(self.engine_name, engine_time, network_time) # metrics counter_inc('engine', self.engine_name, 'search', 'count', 'successful') histogram_observe(engine_time, 'engine', self.engine_name, 'time', 'total') - if page_load_time is not None: - histogram_observe(page_load_time, 'engine', self.engine_name, 'time', 'http') + if network_time is not None: + histogram_observe(network_time, 'engine', self.engine_name, 'time', 'http') - def extend_container(self, result_container, start_time, search_results): + def extend_container(self, result_container, start_time, search_results, network_time=None): if getattr(threading.current_thread(), '_timeout', False): # the main thread is not waiting anymore self.handle_exception(result_container, 'timeout', None) else: # check if the engine accepted the request if search_results is not None: - self._extend_container_basic(result_container, start_time, search_results) + self._extend_container_basic(result_container, start_time, search_results, network_time) self.suspended_status.resume() def extend_container_if_suspended(self, result_container): diff --git a/searx/search/processors/online.py b/searx/search/processors/online.py index f30206dc8..faa0483fc 100644 --- a/searx/search/processors/online.py +++ b/searx/search/processors/online.py @@ -9,6 +9,8 @@ from timeit import default_timer import asyncio import ssl +from typing import Dict, List + import httpx import searx.network @@ -42,13 +44,8 @@ class OnlineProcessor(EngineProcessor): engine_type = 'online' def initialize(self): - # set timeout for all HTTP requests - searx.network.set_timeout_for_thread(self.engine.timeout, start_time=default_timer()) - # reset the HTTP total time - searx.network.reset_time_for_thread() - # set the network - searx.network.set_context_network_name(self.engine_name) - super().initialize() + with searx.network.networkcontext_for_thread(self.engine_name, self.engine.timeout) as network_context: + network_context.call(super().initialize) def get_params(self, search_query, engine_category): """Returns a set of :ref:`request params ` or ``None`` @@ -112,7 +109,8 @@ class OnlineProcessor(EngineProcessor): else: req = searx.network.post - request_args['data'] = params['data'] + if params['data']: + request_args['data'] = params['data'] # send the request response = req(params['url'], **request_args) @@ -133,7 +131,7 @@ class OnlineProcessor(EngineProcessor): return response - def _search_basic(self, query, params): + def _search_basic(self, query, params) -> List[Dict]: # update request parameters dependent on # search-engine (contained in engines folder) self.engine.request(query, params) @@ -153,21 +151,20 @@ class OnlineProcessor(EngineProcessor): return self.engine.response(response) def search(self, query, params, result_container, start_time, timeout_limit): - # set timeout for all HTTP requests - searx.network.set_timeout_for_thread(timeout_limit, start_time=start_time) - # reset the HTTP total time - searx.network.reset_time_for_thread() - # set the network - searx.network.set_context_network_name(self.engine_name) - try: - # send requests and parse the results - search_results = self._search_basic(query, params) - self.extend_container(result_container, start_time, search_results) + with searx.network.networkcontext_for_thread( + self.engine_name, timeout_limit, start_time + ) as network_context: + # send requests and parse the results + search_results = network_context.call(self._search_basic, query, params) + # extend_container in the network context to get the HTTP runtime + self.extend_container( + result_container, start_time, search_results, network_time=network_context.get_http_runtime() + ) except ssl.SSLError as e: # requests timeout (connect or read) self.handle_exception(result_container, e, suspend=True) - self.logger.error("SSLError {}, verify={}".format(e, searx.network.get_network(self.engine_name).verify)) + self.logger.error("SSLError {}, verify={}".format(e, searx.network.NETWORKS.get(self.engine_name).verify)) except (httpx.TimeoutException, asyncio.TimeoutError) as e: # requests timeout (connect or read) self.handle_exception(result_container, e, suspend=True) diff --git a/searx/settings_defaults.py b/searx/settings_defaults.py index a0d0daa09..a418d2725 100644 --- a/searx/settings_defaults.py +++ b/searx/settings_defaults.py @@ -209,9 +209,11 @@ SCHEMA = { 'outgoing': { 'useragent_suffix': SettingsValue(str, ''), 'request_timeout': SettingsValue(numbers.Real, 3.0), - 'enable_http2': SettingsValue(bool, True), - 'verify': SettingsValue((bool, str), True), 'max_request_timeout': SettingsValue((None, numbers.Real), None), + # defaut network + 'verify': SettingsValue((bool, str), True), + 'enable_http': SettingsValue(bool, False), + 'enable_http2': SettingsValue(bool, True), 'pool_connections': SettingsValue(int, 100), 'pool_maxsize': SettingsValue(int, 10), 'keepalive_expiry': SettingsValue(numbers.Real, 5.0), diff --git a/searx/webapp.py b/searx/webapp.py index 53ca96785..eb5f44fcc 100755 --- a/searx/webapp.py +++ b/searx/webapp.py @@ -128,7 +128,7 @@ from searx.autocomplete import search_autocomplete, backends as autocomplete_bac from searx.redisdb import initialize as redis_initialize from searx.sxng_locales import sxng_locales from searx.search import SearchWithPlugins, initialize as search_initialize -from searx.network import stream as http_stream, set_context_network_name +from searx.network import NETWORKS from searx.search.checker import get_result as checker_get_result logger = logger.getChild('webapp') @@ -1050,8 +1050,8 @@ def image_proxy(): 'Sec-GPC': '1', 'DNT': '1', } - set_context_network_name('image_proxy') - resp, stream = http_stream(method='GET', url=url, headers=request_headers, allow_redirects=True) + network_context = NETWORKS.get('image_proxy').get_context() + resp = network_context.stream(method='GET', url=url, headers=request_headers, allow_redirects=True, timeout=30) content_length = resp.headers.get('Content-Length') if content_length and content_length.isdigit() and int(content_length) > maximum_size: return 'Max size', 400 @@ -1082,18 +1082,19 @@ def image_proxy(): logger.exception('HTTP error on closing') def close_stream(): - nonlocal resp, stream + nonlocal resp try: if resp: resp.close() del resp - del stream except httpx.HTTPError as e: logger.debug('Exception while closing response', e) try: headers = dict_subset(resp.headers, {'Content-Type', 'Content-Encoding', 'Content-Length', 'Length'}) - response = Response(stream, mimetype=resp.headers['Content-Type'], headers=headers, direct_passthrough=True) + response = Response( + resp.iter_bytes(), mimetype=resp.headers['Content-Type'], headers=headers, direct_passthrough=True + ) response.call_on_close(close_stream) return response except httpx.HTTPError: diff --git a/searxng_extra/update/update_ahmia_blacklist.py b/searxng_extra/update/update_ahmia_blacklist.py index a11413f14..59b401c9c 100755 --- a/searxng_extra/update/update_ahmia_blacklist.py +++ b/searxng_extra/update/update_ahmia_blacklist.py @@ -13,14 +13,15 @@ Output file: :origin:`searx/data/ahmia_blacklist.txt` (:origin:`CI Update data from os.path import join -import requests from searx import searx_dir +from searx.network import provide_networkcontext, get URL = 'https://ahmia.fi/blacklist/banned/' +@provide_networkcontext() def fetch_ahmia_blacklist(): - resp = requests.get(URL, timeout=3.0) + resp = get(URL, timeout=3.0) if resp.status_code != 200: # pylint: disable=broad-exception-raised raise Exception("Error fetching Ahmia blacklist, HTTP code " + resp.status_code) diff --git a/searxng_extra/update/update_currencies.py b/searxng_extra/update/update_currencies.py index a126d1532..8297b482f 100755 --- a/searxng_extra/update/update_currencies.py +++ b/searxng_extra/update/update_currencies.py @@ -21,6 +21,7 @@ from os.path import join from searx import searx_dir from searx.locales import LOCALE_NAMES, locales_initialize from searx.engines import wikidata, set_loggers +from searx.network import provide_networkcontext set_loggers(wikidata, 'wikidata') locales_initialize() @@ -138,8 +139,8 @@ def get_filename(): return join(join(searx_dir, "data"), "currencies.json") +@provide_networkcontext() def main(): - db = fetch_db() # static diff --git a/searxng_extra/update/update_engine_descriptions.py b/searxng_extra/update/update_engine_descriptions.py index 301ce798d..8e880733e 100755 --- a/searxng_extra/update/update_engine_descriptions.py +++ b/searxng_extra/update/update_engine_descriptions.py @@ -206,7 +206,6 @@ def initialize(): def fetch_wikidata_descriptions(): print('Fetching wikidata descriptions') - searx.network.set_timeout_for_thread(60) result = wikidata.send_wikidata_query( SPARQL_DESCRIPTION.replace('%IDS%', IDS).replace('%LANGUAGES_SPARQL%', LANGUAGES_SPARQL) ) @@ -355,6 +354,7 @@ def get_output(): return output +@searx.network.provide_networkcontext() def main(): initialize() fetch_wikidata_descriptions() diff --git a/searxng_extra/update/update_engine_traits.py b/searxng_extra/update/update_engine_traits.py index 37f4a8745..d16ed54f4 100755 --- a/searxng_extra/update/update_engine_traits.py +++ b/searxng_extra/update/update_engine_traits.py @@ -75,6 +75,7 @@ lang2emoji = { } +@network.provide_networkcontext() def main(): load_engines(settings['engines']) # traits_map = EngineTraitsMap.from_data() @@ -85,7 +86,6 @@ def main(): def fetch_traits_map(): """Fetchs supported languages for each engine and writes json file with those.""" - network.set_timeout_for_thread(10.0) def log(msg): print(msg) diff --git a/searxng_extra/update/update_external_bangs.py b/searxng_extra/update/update_external_bangs.py index 420393982..3d20f443b 100755 --- a/searxng_extra/update/update_external_bangs.py +++ b/searxng_extra/update/update_external_bangs.py @@ -22,8 +22,7 @@ import json import re from os.path import join -import httpx - +import searx.network from searx import searx_dir # pylint: disable=E0401 C0413 from searx.external_bang import LEAF_KEY @@ -35,7 +34,7 @@ HTTP_COLON = 'http:' def get_bang_url(): - response = httpx.get(URL_BV1) + response = searx.network.get(URL_BV1) response.raise_for_status() r = RE_BANG_VERSION.findall(response.text) @@ -43,7 +42,7 @@ def get_bang_url(): def fetch_ddg_bangs(url): - response = httpx.get(url) + response = searx.network.get(url) response.raise_for_status() return json.loads(response.content.decode()) @@ -155,9 +154,14 @@ def get_bangs_filename(): return join(join(searx_dir, "data"), "external_bangs.json") -if __name__ == '__main__': +@searx.network.provide_networkcontext() +def main(): bangs_url, bangs_version = get_bang_url() print(f'fetch bangs from {bangs_url}') output = {'version': bangs_version, 'trie': parse_ddg_bangs(fetch_ddg_bangs(bangs_url))} - with open(get_bangs_filename(), 'w', encoding="utf8") as fp: - json.dump(output, fp, ensure_ascii=False, indent=4) + with open(get_bangs_filename(), 'w', encoding="utf8") as f: + json.dump(output, f, ensure_ascii=False, indent=4) + + +if __name__ == '__main__': + main() diff --git a/searxng_extra/update/update_firefox_version.py b/searxng_extra/update/update_firefox_version.py index 56758b5e8..9b394c81c 100755 --- a/searxng_extra/update/update_firefox_version.py +++ b/searxng_extra/update/update_firefox_version.py @@ -15,9 +15,9 @@ from os.path import join from urllib.parse import urlparse, urljoin from packaging.version import parse -import requests from lxml import html from searx import searx_dir +from searx import network URL = 'https://ftp.mozilla.org/pub/firefox/releases/' RELEASE_PATH = '/pub/firefox/releases/' @@ -37,8 +37,9 @@ useragents = { } +@network.provide_networkcontext() def fetch_firefox_versions(): - resp = requests.get(URL, timeout=2.0) + resp = network.get(URL, timeout=2.0) if resp.status_code != 200: # pylint: disable=broad-exception-raised raise Exception("Error fetching firefox versions, HTTP code " + resp.status_code) diff --git a/searxng_extra/update/update_osm_keys_tags.py b/searxng_extra/update/update_osm_keys_tags.py index 75a55145d..b5ff6e95a 100755 --- a/searxng_extra/update/update_osm_keys_tags.py +++ b/searxng_extra/update/update_osm_keys_tags.py @@ -48,8 +48,8 @@ import collections from pathlib import Path from searx import searx_dir -from searx.network import set_timeout_for_thread from searx.engines import wikidata, set_loggers +from searx.network import provide_networkcontext from searx.sxng_locales import sxng_locales from searx.engines.openstreetmap import get_key_rank, VALUE_TO_LINK @@ -207,12 +207,15 @@ def get_osm_tags_filename(): return Path(searx_dir) / "data" / "osm_keys_tags.json" -if __name__ == '__main__': - - set_timeout_for_thread(60) +@provide_networkcontext() +def main(): result = { 'keys': optimize_keys(get_keys()), 'tags': optimize_tags(get_tags()), } with open(get_osm_tags_filename(), 'w', encoding="utf8") as f: json.dump(result, f, indent=4, ensure_ascii=False, sort_keys=True) + + +if __name__ == '__main__': + main() diff --git a/searxng_extra/update/update_wikidata_units.py b/searxng_extra/update/update_wikidata_units.py index e999b6cfd..a4da2ef99 100755 --- a/searxng_extra/update/update_wikidata_units.py +++ b/searxng_extra/update/update_wikidata_units.py @@ -18,6 +18,7 @@ from os.path import join from searx import searx_dir from searx.engines import wikidata, set_loggers +from searx.network import provide_networkcontext set_loggers(wikidata, 'wikidata') @@ -45,6 +46,7 @@ ORDER BY ?item DESC(?rank) ?symbol """ +@provide_networkcontext() def get_data(): results = collections.OrderedDict() response = wikidata.send_wikidata_query(SARQL_REQUEST) diff --git a/tests/__init__.py b/tests/__init__.py index 8399f0604..8f265b091 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,6 +1,6 @@ import os -import aiounittest +import unittest os.environ.pop('SEARX_DEBUG', None) os.environ.pop('SEARX_DEBUG_LOG_LEVEL', None) @@ -36,7 +36,7 @@ class SearxTestLayer: pass -class SearxTestCase(aiounittest.AsyncTestCase): +class SearxTestCase(unittest.TestCase): """Base test case for non-robot tests.""" layer = SearxTestLayer diff --git a/tests/unit/network/network_settings.yml b/tests/unit/network/network_settings.yml new file mode 100644 index 000000000..ad33049b3 --- /dev/null +++ b/tests/unit/network/network_settings.yml @@ -0,0 +1,97 @@ +server: + secret_key: "user_settings_secret" +outgoing: + networks: + http00: + enable_http: false + enable_http2: false + http10: + enable_http: true + enable_http2: false + http01: + enable_http: false + enable_http2: true + http11: + enable_http: true + enable_http2: true + sock5h: + proxies: socks5h://127.0.0.1:4000 + sock5: + proxies: socks5://127.0.0.1:4000 + sock4: + proxies: socks4://127.0.0.1:4000 +engines: + # + - name: enginea + shortcut: a + engine: dummy + network: + proxies: http://127.0.0.1:8000 + - name: engineb + shortcut: b + engine: dummy + proxies: http://127.0.0.1:8000 + - name: enginec + shortcut: c + engine: dummy + network: + proxies: + all://: http://127.0.0.1:8000 + - name: engined + shortcut: d + engine: dummy + network: + proxies: + all://: + - http://127.0.0.1:8000 + - http://127.0.0.1:9000 + - name: enginee + shortcut: e + engine: dummy + network: + proxies: + all://: + - http://127.0.0.1:8000 + - http://127.0.0.1:9000 + example.com://: + - http://127.0.0.1:6000 + - http://127.0.0.1:7000 + - name: enginef + shortcut: f + engine: dummy + network: + proxies: + - http://127.0.0.1:8000 + - http://127.0.0.1:9000 + - name: engineg + shortcut: g + engine: dummy + network: + local_addresses: + - 192.168.0.1 + - 192.168.0.200 + - name: engineh + shortcut: h + engine: dummy + network: + local_addresses: 192.168.0.2 + - name: ip2 + shortcut: ip2 + engine: dummy + network: + local_addresses: 192.168.0.1/30 + - name: enginei + shortcut: i + engine: dummy + network: + retry_strategy: engine + - name: enginej + shortcut: j + engine: dummy + network: + retry_strategy: SAME_HTTP_CLIENT + - name: enginek + shortcut: k + engine: dummy + network: + retry_strategy: DIFFERENT_HTTP_CLIENT diff --git a/tests/unit/network/test_network.py b/tests/unit/network/test_network.py index 905b981c1..672d6e1f6 100644 --- a/tests/unit/network/test_network.py +++ b/tests/unit/network/test_network.py @@ -1,56 +1,81 @@ # SPDX-License-Identifier: AGPL-3.0-or-later +from typing import Optional from mock import patch - +from parameterized import parameterized, parameterized_class +import time import httpx -from searx.network.network import Network, NETWORKS, initialize +import searx.network +import searx.network.context +from searx import settings +from searx.network.client import BaseHTTPClient, HTTPClient, TorHTTPClient, _HTTPMultiClientConf +from searx.network.network import Network, NETWORKS from tests import SearxTestCase +class TestHTTPClient(SearxTestCase): + def test_get_client(self): + httpclient = BaseHTTPClient(verify=True) + client1 = httpclient._get_client_and_update_kwargs() + client2 = httpclient._get_client_and_update_kwargs(verify=True) + client3 = httpclient._get_client_and_update_kwargs(max_redirects=10) + client4 = httpclient._get_client_and_update_kwargs(verify=True) + client5 = httpclient._get_client_and_update_kwargs(verify=False) + client6 = httpclient._get_client_and_update_kwargs(max_redirects=10) + + self.assertEqual(client1, client2) + self.assertEqual(client1, client4) + self.assertNotEqual(client1, client3) + self.assertNotEqual(client1, client5) + self.assertEqual(client3, client6) + + httpclient.close() + + class TestNetwork(SearxTestCase): def setUp(self): - initialize() + NETWORKS.initialize_from_settings(settings_engines=settings["engines"], settings_outgoing=settings["outgoing"]) def test_simple(self): - network = Network() + network = Network.from_dict() self.assertEqual(next(network._local_addresses_cycle), None) self.assertEqual(next(network._proxies_cycle), ()) def test_ipaddress_cycle(self): - network = NETWORKS['ipv6'] + network = NETWORKS.get('ipv6') self.assertEqual(next(network._local_addresses_cycle), '::') self.assertEqual(next(network._local_addresses_cycle), '::') - network = NETWORKS['ipv4'] + network = NETWORKS.get('ipv4') self.assertEqual(next(network._local_addresses_cycle), '0.0.0.0') self.assertEqual(next(network._local_addresses_cycle), '0.0.0.0') - network = Network(local_addresses=['192.168.0.1', '192.168.0.2']) + network = Network.from_dict(local_addresses=['192.168.0.1', '192.168.0.2']) self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1') self.assertEqual(next(network._local_addresses_cycle), '192.168.0.2') self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1') - network = Network(local_addresses=['192.168.0.0/30']) + network = Network.from_dict(local_addresses=['192.168.0.0/30']) self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1') self.assertEqual(next(network._local_addresses_cycle), '192.168.0.2') self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1') self.assertEqual(next(network._local_addresses_cycle), '192.168.0.2') - network = Network(local_addresses=['fe80::/10']) + network = Network.from_dict(local_addresses=['fe80::/10']) self.assertEqual(next(network._local_addresses_cycle), 'fe80::1') self.assertEqual(next(network._local_addresses_cycle), 'fe80::2') self.assertEqual(next(network._local_addresses_cycle), 'fe80::3') with self.assertRaises(ValueError): - Network(local_addresses=['not_an_ip_address']) + Network.from_dict(local_addresses=['not_an_ip_address']) def test_proxy_cycles(self): - network = Network(proxies='http://localhost:1337') + network = Network.from_dict(proxies='http://localhost:1337') self.assertEqual(next(network._proxies_cycle), (('all://', 'http://localhost:1337'),)) - network = Network(proxies={'https': 'http://localhost:1337', 'http': 'http://localhost:1338'}) + network = Network.from_dict(proxies={'https': 'http://localhost:1337', 'http': 'http://localhost:1338'}) self.assertEqual( next(network._proxies_cycle), (('https://', 'http://localhost:1337'), ('http://', 'http://localhost:1338')) ) @@ -58,7 +83,7 @@ class TestNetwork(SearxTestCase): next(network._proxies_cycle), (('https://', 'http://localhost:1337'), ('http://', 'http://localhost:1338')) ) - network = Network( + network = Network.from_dict( proxies={'https': ['http://localhost:1337', 'http://localhost:1339'], 'http': 'http://localhost:1338'} ) self.assertEqual( @@ -69,7 +94,7 @@ class TestNetwork(SearxTestCase): ) with self.assertRaises(ValueError): - Network(proxies=1) + Network.from_dict(proxies=1) def test_get_kwargs_clients(self): kwargs = { @@ -78,120 +103,127 @@ class TestNetwork(SearxTestCase): 'timeout': 2, 'allow_redirects': True, } - kwargs_client = Network.extract_kwargs_clients(kwargs) + kwargs_client, kwargs = BaseHTTPClient()._extract_kwargs_clients(kwargs) - self.assertEqual(len(kwargs_client), 2) self.assertEqual(len(kwargs), 2) - self.assertEqual(kwargs['timeout'], 2) - self.assertEqual(kwargs['follow_redirects'], True) + self.assertEqual(kwargs['allow_redirects'], True) - self.assertTrue(kwargs_client['verify']) - self.assertEqual(kwargs_client['max_redirects'], 5) + self.assertIsInstance(kwargs_client, _HTTPMultiClientConf) + self.assertTrue(kwargs_client.verify) + self.assertEqual(kwargs_client.max_redirects, 5) - async def test_get_client(self): - network = Network(verify=True) - client1 = await network.get_client() - client2 = await network.get_client(verify=True) - client3 = await network.get_client(max_redirects=10) - client4 = await network.get_client(verify=True) - client5 = await network.get_client(verify=False) - client6 = await network.get_client(max_redirects=10) + def test_close(self): + network = Network.from_dict(verify=True) + network._get_http_client() + network.close() - self.assertEqual(client1, client2) - self.assertEqual(client1, client4) - self.assertNotEqual(client1, client3) - self.assertNotEqual(client1, client5) - self.assertEqual(client3, client6) - - await network.aclose() - - async def test_aclose(self): - network = Network(verify=True) - await network.get_client() - await network.aclose() - - async def test_request(self): + def test_request(self): a_text = 'Lorem Ipsum' response = httpx.Response(status_code=200, text=a_text) - with patch.object(httpx.AsyncClient, 'request', return_value=response): - network = Network(enable_http=True) - response = await network.request('GET', 'https://example.com/') + with patch.object(httpx.Client, 'request', return_value=response): + network = Network.from_dict(enable_http=True) + http_client = network._get_http_client() + response = http_client.request('GET', 'https://example.com/') self.assertEqual(response.text, a_text) - await network.aclose() + network.close() +@parameterized_class( + [ + {"RETRY_STRATEGY": "ENGINE"}, + {"RETRY_STRATEGY": "SAME_HTTP_CLIENT"}, + {"RETRY_STRATEGY": "DIFFERENT_HTTP_CLIENT"}, + ] +) class TestNetworkRequestRetries(SearxTestCase): - TEXT = 'Lorem Ipsum' + TEXT = "Lorem Ipsum" + RETRY_STRATEGY = "Engine" @classmethod - def get_response_404_then_200(cls): + def get_response_403_then_200(cls): first = True - async def get_response(*args, **kwargs): + def get_response(*args, **kwargs): nonlocal first + request = httpx.Request('GET', 'http://localhost') if first: first = False - return httpx.Response(status_code=403, text=TestNetworkRequestRetries.TEXT) - return httpx.Response(status_code=200, text=TestNetworkRequestRetries.TEXT) + return httpx.Response(status_code=403, text=TestNetworkRequestRetries.TEXT, request=request) + return httpx.Response(status_code=200, text=TestNetworkRequestRetries.TEXT, request=request) return get_response - async def test_retries_ok(self): - with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()): - network = Network(enable_http=True, retries=1, retry_on_http_error=403) - response = await network.request('GET', 'https://example.com/', raise_for_httperror=False) + def test_retries_ok(self): + with patch.object(httpx.Client, 'request', new=TestNetworkRequestRetries.get_response_403_then_200()): + network = Network.from_dict( + enable_http=True, retries=1, retry_on_http_error=403, retry_strategy=self.RETRY_STRATEGY + ) + context = network.get_context(timeout=3600.0) + response = context.request('GET', 'https://example.com/', raise_for_httperror=False) + self.assertEqual(response.status_code, 200) self.assertEqual(response.text, TestNetworkRequestRetries.TEXT) - await network.aclose() + network.close() - async def test_retries_fail_int(self): - with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()): - network = Network(enable_http=True, retries=0, retry_on_http_error=403) - response = await network.request('GET', 'https://example.com/', raise_for_httperror=False) + def test_retries_fail_int(self): + with patch.object(httpx.Client, 'request', new=TestNetworkRequestRetries.get_response_403_then_200()): + network = Network.from_dict( + enable_http=True, retries=0, retry_on_http_error=403, retry_strategy=self.RETRY_STRATEGY + ) + context = network.get_context(timeout=2.0) + response = context.request('GET', 'https://example.com/', raise_for_httperror=False) self.assertEqual(response.status_code, 403) - await network.aclose() + network.close() - async def test_retries_fail_list(self): - with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()): - network = Network(enable_http=True, retries=0, retry_on_http_error=[403, 429]) - response = await network.request('GET', 'https://example.com/', raise_for_httperror=False) + def test_retries_fail_list(self): + with patch.object(httpx.Client, 'request', new=TestNetworkRequestRetries.get_response_403_then_200()): + network = Network.from_dict( + enable_http=True, retries=0, retry_on_http_error=[403, 429], retry_strategy=self.RETRY_STRATEGY + ) + context = network.get_context(timeout=2.0) + response = context.request('GET', 'https://example.com/', raise_for_httperror=False) self.assertEqual(response.status_code, 403) - await network.aclose() + network.close() - async def test_retries_fail_bool(self): - with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()): - network = Network(enable_http=True, retries=0, retry_on_http_error=True) - response = await network.request('GET', 'https://example.com/', raise_for_httperror=False) + def test_retries_fail_bool(self): + with patch.object(httpx.Client, 'request', new=TestNetworkRequestRetries.get_response_403_then_200()): + network = Network.from_dict( + enable_http=True, retries=0, retry_on_http_error=True, retry_strategy=self.RETRY_STRATEGY + ) + context = network.get_context(timeout=2.0) + response = context.request('GET', 'https://example.com/', raise_for_httperror=False) self.assertEqual(response.status_code, 403) - await network.aclose() + network.close() - async def test_retries_exception_then_200(self): + def test_retries_exception_then_200(self): request_count = 0 - async def get_response(*args, **kwargs): + def get_response(*args, **kwargs): nonlocal request_count request_count += 1 if request_count < 3: raise httpx.RequestError('fake exception', request=None) return httpx.Response(status_code=200, text=TestNetworkRequestRetries.TEXT) - with patch.object(httpx.AsyncClient, 'request', new=get_response): - network = Network(enable_http=True, retries=2) - response = await network.request('GET', 'https://example.com/', raise_for_httperror=False) + with patch.object(httpx.Client, 'request', new=get_response): + network = Network.from_dict(enable_http=True, retries=2, retry_strategy=self.RETRY_STRATEGY) + context = network.get_context(timeout=2.0) + response = context.request('GET', 'https://example.com/', raise_for_httperror=False) self.assertEqual(response.status_code, 200) self.assertEqual(response.text, TestNetworkRequestRetries.TEXT) - await network.aclose() + network.close() - async def test_retries_exception(self): - async def get_response(*args, **kwargs): + def test_retries_exception(self): + def get_response(*args, **kwargs): raise httpx.RequestError('fake exception', request=None) - with patch.object(httpx.AsyncClient, 'request', new=get_response): - network = Network(enable_http=True, retries=0) + with patch.object(httpx.Client, 'request', new=get_response): + network = Network.from_dict(enable_http=True, retries=0, retry_strategy=self.RETRY_STRATEGY) + context = network.get_context(timeout=2.0) with self.assertRaises(httpx.RequestError): - await network.request('GET', 'https://example.com/', raise_for_httperror=False) - await network.aclose() + context.request('GET', 'https://example.com/', raise_for_httperror=False) + network.close() class TestNetworkStreamRetries(SearxTestCase): @@ -200,43 +232,246 @@ class TestNetworkStreamRetries(SearxTestCase): @classmethod def get_response_exception_then_200(cls): + from httpx import SyncByteStream + first = True - def stream(*args, **kwargs): + class fake_stream(SyncByteStream): + def __iter__(self): + yield TestNetworkStreamRetries.TEXT.encode() + + def send(*args, **kwargs): nonlocal first if first: first = False raise httpx.RequestError('fake exception', request=None) - return httpx.Response(status_code=200, text=TestNetworkStreamRetries.TEXT) + return httpx.Response(status_code=200, stream=fake_stream()) - return stream + return send - async def test_retries_ok(self): - with patch.object(httpx.AsyncClient, 'stream', new=TestNetworkStreamRetries.get_response_exception_then_200()): - network = Network(enable_http=True, retries=1, retry_on_http_error=403) - response = await network.stream('GET', 'https://example.com/') - self.assertEqual(response.text, TestNetworkStreamRetries.TEXT) - await network.aclose() + def test_retries_ok(self): + with patch.object(httpx.Client, 'send', new=TestNetworkStreamRetries.get_response_exception_then_200()): + network = Network.from_dict(enable_http=True, retries=1, retry_on_http_error=403) + context = network.get_context(timeout=3600.0) + response = context.stream('GET', 'https://example.com/') + btext = b"".join(btext for btext in response.iter_bytes()) + self.assertEqual(btext.decode(), TestNetworkStreamRetries.TEXT) + response.close() + network.close() - async def test_retries_fail(self): - with patch.object(httpx.AsyncClient, 'stream', new=TestNetworkStreamRetries.get_response_exception_then_200()): - network = Network(enable_http=True, retries=0, retry_on_http_error=403) + def test_retries_fail(self): + with patch.object(httpx.Client, 'send', new=TestNetworkStreamRetries.get_response_exception_then_200()): + network = Network.from_dict(enable_http=True, retries=0, retry_on_http_error=403) + context = network.get_context(timeout=2.0) with self.assertRaises(httpx.RequestError): - await network.stream('GET', 'https://example.com/') - await network.aclose() + context.stream('GET', 'https://example.com/') + network.close() - async def test_retries_exception(self): + def test_retries_exception(self): first = True - def stream(*args, **kwargs): + def send(*args, **kwargs): nonlocal first if first: first = False return httpx.Response(status_code=403, text=TestNetworkRequestRetries.TEXT) return httpx.Response(status_code=200, text=TestNetworkRequestRetries.TEXT) - with patch.object(httpx.AsyncClient, 'stream', new=stream): - network = Network(enable_http=True, retries=0, retry_on_http_error=403) - response = await network.stream('GET', 'https://example.com/', raise_for_httperror=False) + with patch.object(httpx.Client, 'send', new=send): + network = Network.from_dict(enable_http=True, retries=0, retry_on_http_error=403) + context = network.get_context(timeout=2.0) + response = context.stream('GET', 'https://example.com/', raise_for_httperror=False) self.assertEqual(response.status_code, 403) - await network.aclose() + network.close() + + +class TestNetworkApi(SearxTestCase): + + TEXT = 'Lorem Ipsum' + + def test_no_networkcontext(self): + with self.assertRaises(searx.network.NetworkContextNotFound): + searx.network.request("GET", "https://example.com") + + def test_provide_networkcontext(self): + send_was_called = False + response = None + + def send(*args, **kwargs): + nonlocal send_was_called + send_was_called = True + return httpx.Response(status_code=200, text=TestNetworkApi.TEXT) + + @searx.network.provide_networkcontext() + def main(): + nonlocal response + response = searx.network.get("https://example.com") + + with patch.object(httpx.Client, 'send', new=send): + main() + + self.assertTrue(send_was_called) + self.assertIsInstance(response, httpx.Response) + self.assertEqual(response.text, TestNetworkApi.TEXT) + + @parameterized.expand( + [ + ("OPTIONS",), + ("HEAD",), + ("DELETE",), + ] + ) + def test_options(self, method): + send_was_called = False + request: Optional[httpx.Request] = None + response = None + + def send(_, actual_request: httpx.Request, **kwargs): + nonlocal request, send_was_called + request = actual_request + send_was_called = True + return httpx.Response(status_code=200, text=TestNetworkApi.TEXT) + + @searx.network.provide_networkcontext() + def main(): + nonlocal response + f = getattr(searx.network, method.lower()) + response = f("https://example.com", params={"a": "b"}, headers={"c": "d"}) + + with patch.object(httpx.Client, 'send', new=send): + main() + + self.assertTrue(send_was_called) + self.assertIsInstance(response, httpx.Response) + self.assertEqual(request.method, method) + self.assertEqual(request.url, "https://example.com?a=b") + self.assertEqual(request.headers["c"], "d") + self.assertEqual(response.text, TestNetworkApi.TEXT) + + @parameterized.expand( + [ + ("POST",), + ("PUT",), + ("PATCH",), + ] + ) + def test_post(self, method): + send_was_called = False + request: Optional[httpx.Request] = None + response = None + + data = "this_is_data" + + def send(_, actual_request: httpx.Request, **kwargs): + nonlocal request, send_was_called + request = actual_request + send_was_called = True + return httpx.Response(status_code=200, text=TestNetworkApi.TEXT) + + @searx.network.provide_networkcontext() + def main(): + nonlocal response + f = getattr(searx.network, method.lower()) + response = f("https://example.com", params={"a": "b"}, headers={"c": "d"}, data=data) + + with patch.object(httpx.Client, 'send', new=send): + main() + + self.assertTrue(send_was_called) + self.assertIsInstance(response, httpx.Response) + self.assertEqual(request.method, method) + self.assertEqual(request.url, "https://example.com?a=b") + self.assertEqual(request.headers["c"], "d") + self.assertEqual(request.content, data.encode()) + self.assertEqual(response.text, TestNetworkApi.TEXT) + + def test_get_remaining_time(self): + def send(*args, **kwargs): + time.sleep(0.25) + return httpx.Response(status_code=200, text=TestNetworkApi.TEXT) + + with patch.object(httpx.Client, 'send', new=send): + with searx.network.networkcontext_for_thread(timeout=3.0) as network_context: + network_context.request("GET", "https://example.com") + network_context.get_http_runtime() + self.assertTrue(network_context.get_http_runtime() > 0.25) + overhead = 0.2 # see NetworkContext.get_remaining_time + self.assertTrue(network_context.get_remaining_time() < (2.75 + overhead)) + + +class TestNetworkRepr(SearxTestCase): + def test_repr(self): + network = Network.from_dict(logger_name="test", retry_strategy="ENGINE") + network_context = network.get_context(5.0) + network_context._set_http_client() + http_client = network_context._get_http_client() + + r_network = repr(network) + r_network_context = repr(network_context) + r_http_client = repr(http_client) + + self.assertEqual(r_network, "") + self.assertTrue(r_network_context.startswith(">")) + self.assertTrue(r_http_client.startswith("