searx.network: refactoring

* Allow to keep one HTTP client all along during the processing of one user query in one engine.
* async code is in one place: client.py, nowhere else. The stream method returns a httpx.Response as expected instead of a tuple as it is now in the master branch.
This commit is contained in:
Alexandre Flament 2023-08-18 12:16:02 +00:00
parent 1f7e5e109a
commit d5ef764d32
27 changed files with 2281 additions and 884 deletions

View File

@ -62,6 +62,7 @@ confidence=
disable=duplicate-code,
missing-function-docstring,
consider-using-f-string,
too-few-public-methods,
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option

View File

@ -1,5 +1,6 @@
mock==5.1.0
nose2[coverage_plugin]==0.14.0
parameterized==0.9.0
cov-core==1.15.0
black==22.12.0
pylint==3.0.2
@ -17,7 +18,6 @@ sphinx-autobuild==2021.3.14
sphinx-notfound-page==1.0.0
myst-parser==2.0.0
linuxdoc==20231020
aiounittest==1.4.2
yamllint==1.32.0
wlc==1.13
coloredlogs==15.0.1

View File

@ -7,10 +7,9 @@ lxml==4.9.3
pygments==2.16.1
python-dateutil==2.8.2
pyyaml==6.0.1
httpx[http2]==0.24.1
httpx[http2]==0.25.0
Brotli==1.1.0
uvloop==0.19.0
httpx-socks[asyncio]==0.7.7
httpx-socks==0.7.8
setproctitle==1.3.3
redis==4.6.0
markdown-it-py==3.0.0

View File

@ -16,7 +16,7 @@ from searx.engines import (
engines,
google,
)
from searx.network import get as http_get
from searx.network import NETWORKS
from searx.exceptions import SearxEngineResponseException
@ -24,7 +24,8 @@ def get(*args, **kwargs):
if 'timeout' not in kwargs:
kwargs['timeout'] = settings['outgoing']['request_timeout']
kwargs['raise_for_httperror'] = True
return http_get(*args, **kwargs)
network_context = NETWORKS.get('autocomplete').get_context()
return network_context.request('GET', *args, **kwargs)
def brave(query, _lang):

View File

@ -1,266 +1,483 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=missing-module-docstring, global-statement
# pyright: basic
# pylint: disable=redefined-outer-name
# ^^ because there is the raise_for_httperror function and the raise_for_httperror parameter.
"""HTTP for SearXNG.
import asyncio
In httpx and similar libraries, a client (also named session) contains a pool of HTTP connections.
The client reuses these HTTP connections and automatically recreates them when the server at the other
end closes the connections. Whatever the library, each client uses only one proxy (eventually none) and only
one local IP address.
SearXNG's primary use case is an engine sending one (or more) outgoing HTTP request(s). The admin can configure
an engine to use multiple proxies and/or IP addresses: SearXNG sends the outgoing HTTP requests through these
different proxies/IP addresses ( = HTTP clients ) on a rotational basis.
In addition, when SearXNG runs an engine request, there is a hard timeout: the engine runtime must not exceed
a defined value.
Moreover, an engine can ask SearXNG to retry a failed HTTP request.
However, we want to keep the engine codes simple and keep the complexity either in the configuration or the
core component components (here, in this module).
To answer the above requirements, the `searx.network` module introduces three components:
* HTTPClient and TorHTTPClient are two classes that wrap one or multiple httpx.Client
* NetworkManager, a set of named Network. Each Network
* holds the configuration defined in settings.yml
* creates NetworkContext fed with an HTTPClient (or TorHTTPClient).
This is where the rotation between the proxies and IP addresses happens.
* NetworkContext to provide a runtime context for the engines. The constructor needs a global timeout
and an HTTPClient factory. NetworkContext is an abstract class with three implementations,
one for each retry policy.
It is only possible to send an HTTP request with a NetworkContext
(otherwise, SearXNG raises a NetworkContextNotFound exception).
Two helpers set a NetworkContext for the current thread:
* The decorator `@provide_networkcontext`, the intended usage is an external script (see searxng_extra)
* The context manager `networkcontext_for_thread`, for the generic use case.
Inside the thread, the caller can use `searx.network.get`, `searx.network.post` and similar functions without
caring about the HTTP client. However, if the caller creates a new thread, it must initialize a new NetworkContext.
A NetworkContext is most probably thread-safe, but this has not been tested.
The overall architecture:
* searx.network.network.NETWORKS contains all the networks.
The method `NetworkManager.get(network_name)` returns an initialized Network.
* searx.network.network.Network define one network (a set of proxies, local IP address, etc...).
They are defined in settings.yml.
The method `Network.get_context()` creates a new NetworkContext.
* searx.network.context contains three different implementations of NetworkContext. One for each retry policy.
* searx.network.client.HTTPClient and searx.network.client.TorHTTPClient implements wrappers around httpx.Client
"""
import threading
import concurrent.futures
from queue import SimpleQueue
from types import MethodType
from timeit import default_timer
from typing import Iterable, NamedTuple, Tuple, List, Dict, Union
from contextlib import contextmanager
from functools import wraps
from typing import Any, Callable, Optional, Union
import httpx
import anyio
from .network import get_network, initialize, check_network_configuration # pylint:disable=cyclic-import
from .client import get_loop
from .raise_for_httperror import raise_for_httperror
from searx.network.client import NOTSET, _NotSetClass
from searx.network.context import NetworkContext, P, R
from searx.network.network import NETWORKS
from searx.network.raise_for_httperror import raise_for_httperror
__all__ = [
"NETWORKS",
"NetworkContextNotFound",
"networkcontext_for_thread",
"provide_networkcontext",
"raise_for_httperror",
"request",
"get",
"options",
"head",
"post",
"put",
"patch",
"delete",
]
THREADLOCAL = threading.local()
"""Thread-local data is data for thread specific values."""
_THREADLOCAL = threading.local()
"""Thread-local that contains only one field: network_context."""
_NETWORK_CONTEXT_KEY = 'network_context'
"""Key to access _THREADLOCAL"""
DEFAULT_MAX_REDIRECTS = httpx._config.DEFAULT_MAX_REDIRECTS # pylint: disable=protected-access
def reset_time_for_thread():
THREADLOCAL.total_time = 0
class NetworkContextNotFound(Exception):
"""A NetworkContext is expected to be set in this thread.
def get_time_for_thread():
"""returns thread's total time or None"""
return THREADLOCAL.__dict__.get('total_time')
def set_timeout_for_thread(timeout, start_time=None):
THREADLOCAL.timeout = timeout
THREADLOCAL.start_time = start_time
def set_context_network_name(network_name):
THREADLOCAL.network = get_network(network_name)
def get_context_network():
"""If set return thread's network.
If unset, return value from :py:obj:`get_network`.
Use searx.network.set_context_for_thread or searx.network.context_for_thread
to set a NetworkContext
"""
return THREADLOCAL.__dict__.get('network') or get_network()
@contextmanager
def _record_http_time():
# pylint: disable=too-many-branches
time_before_request = default_timer()
start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
try:
yield start_time
finally:
# update total_time.
# See get_time_for_thread() and reset_time_for_thread()
if hasattr(THREADLOCAL, 'total_time'):
time_after_request = default_timer()
THREADLOCAL.total_time += time_after_request - time_before_request
def networkcontext_for_thread(
network_name: Optional[str] = None, timeout: Optional[float] = None, start_time: Optional[float] = None
):
"""Context manager to set a NetworkContext for the current thread
The timeout is for the whole function and is infinite by default (None).
The timeout is counted from the current time or start_time if different from None.
def _get_timeout(start_time, kwargs):
# pylint: disable=too-many-branches
Example of usage:
# timeout (httpx)
if 'timeout' in kwargs:
timeout = kwargs['timeout']
else:
timeout = getattr(THREADLOCAL, 'timeout', None)
if timeout is not None:
kwargs['timeout'] = timeout
```python
from time import sleep
from searx.network import networkcontext_for_thread, get
# 2 minutes timeout for the requests without timeout
timeout = timeout or 120
def search(query):
# the timeout is automatically set to 2.0 seconds (the remaining time for the NetworkContext)
# 2.0 because the timeout for the NetworkContext is 3.0 and one second has elllapsed with sleep(1.0)
auckland_time = get("http://worldtimeapi.org/api/timezone/Pacific/Auckland").json()
# the timeout is automatically set to 2.0 - (runtime of the previous HTTP request)
ip_time = get("http://worldtimeapi.org/api/ip").json()
return auckland_time, ip_time
# adjust actual timeout
timeout += 0.2 # overhead
if start_time:
timeout -= default_timer() - start_time
return timeout
def request(method, url, **kwargs):
"""same as requests/requests/api.py request(...)"""
with _record_http_time() as start_time:
network = get_context_network()
timeout = _get_timeout(start_time, kwargs)
future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), get_loop())
try:
return future.result(timeout)
except concurrent.futures.TimeoutError as e:
raise httpx.TimeoutException('Timeout', request=None) from e
def multi_requests(request_list: List["Request"]) -> List[Union[httpx.Response, Exception]]:
"""send multiple HTTP requests in parallel. Wait for all requests to finish."""
with _record_http_time() as start_time:
# send the requests
network = get_context_network()
loop = get_loop()
future_list = []
for request_desc in request_list:
timeout = _get_timeout(start_time, request_desc.kwargs)
future = asyncio.run_coroutine_threadsafe(
network.request(request_desc.method, request_desc.url, **request_desc.kwargs), loop
)
future_list.append((future, timeout))
# read the responses
responses = []
for future, timeout in future_list:
try:
responses.append(future.result(timeout))
except concurrent.futures.TimeoutError:
responses.append(httpx.TimeoutException('Timeout', request=None))
except Exception as e: # pylint: disable=broad-except
responses.append(e)
return responses
class Request(NamedTuple):
"""Request description for the multi_requests function"""
method: str
url: str
kwargs: Dict[str, str] = {}
@staticmethod
def get(url, **kwargs):
return Request('GET', url, kwargs)
@staticmethod
def options(url, **kwargs):
return Request('OPTIONS', url, kwargs)
@staticmethod
def head(url, **kwargs):
return Request('HEAD', url, kwargs)
@staticmethod
def post(url, **kwargs):
return Request('POST', url, kwargs)
@staticmethod
def put(url, **kwargs):
return Request('PUT', url, kwargs)
@staticmethod
def patch(url, **kwargs):
return Request('PATCH', url, kwargs)
@staticmethod
def delete(url, **kwargs):
return Request('DELETE', url, kwargs)
def get(url, **kwargs):
kwargs.setdefault('allow_redirects', True)
return request('get', url, **kwargs)
def options(url, **kwargs):
kwargs.setdefault('allow_redirects', True)
return request('options', url, **kwargs)
def head(url, **kwargs):
kwargs.setdefault('allow_redirects', False)
return request('head', url, **kwargs)
def post(url, data=None, **kwargs):
return request('post', url, data=data, **kwargs)
def put(url, data=None, **kwargs):
return request('put', url, data=data, **kwargs)
def patch(url, data=None, **kwargs):
return request('patch', url, data=data, **kwargs)
def delete(url, **kwargs):
return request('delete', url, **kwargs)
async def stream_chunk_to_queue(network, queue, method, url, **kwargs):
try:
async with await network.stream(method, url, **kwargs) as response:
queue.put(response)
# aiter_raw: access the raw bytes on the response without applying any HTTP content decoding
# https://www.python-httpx.org/quickstart/#streaming-responses
async for chunk in response.aiter_raw(65536):
if len(chunk) > 0:
queue.put(chunk)
except (httpx.StreamClosed, anyio.ClosedResourceError):
# the response was queued before the exception.
# the exception was raised on aiter_raw.
# we do nothing here: in the finally block, None will be queued
# so stream(method, url, **kwargs) generator can stop
pass
except Exception as e: # pylint: disable=broad-except
# broad except to avoid this scenario:
# exception in network.stream(method, url, **kwargs)
# -> the exception is not catch here
# -> queue None (in finally)
# -> the function below steam(method, url, **kwargs) has nothing to return
queue.put(e)
finally:
queue.put(None)
def _stream_generator(method, url, **kwargs):
queue = SimpleQueue()
network = get_context_network()
future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop())
# yield chunks
obj_or_exception = queue.get()
while obj_or_exception is not None:
if isinstance(obj_or_exception, Exception):
raise obj_or_exception
yield obj_or_exception
obj_or_exception = queue.get()
future.result()
def _close_response_method(self):
asyncio.run_coroutine_threadsafe(self.aclose(), get_loop())
# reach the end of _self.generator ( _stream_generator ) to an avoid memory leak.
# it makes sure that :
# * the httpx response is closed (see the stream_chunk_to_queue function)
# * to call future.result() in _stream_generator
for _ in self._generator: # pylint: disable=protected-access
continue
def stream(method, url, **kwargs) -> Tuple[httpx.Response, Iterable[bytes]]:
"""Replace httpx.stream.
Usage:
response, stream = poolrequests.stream(...)
for chunk in stream:
...
httpx.Client.stream requires to write the httpx.HTTPTransport version of the
the httpx.AsyncHTTPTransport declared above.
# "worldtimeapi" is network defined in settings.yml
# network_context.call might call multiple times the search function,
# however the timeout will be respected.
with networkcontext_for_thread('worldtimeapi', timeout=3.0) as network_context:
sleep(1.0)
auckland_time, ip_time = network_context.call(search(query))
print("Auckland time: ", auckland_time["datetime"])
print("My time: ", ip_time["datetime"])
print("HTTP runtime:", network_context.get_http_runtime())
```
"""
generator = _stream_generator(method, url, **kwargs)
network = NETWORKS.get(network_name)
network_context = network.get_context(timeout=timeout, start_time=start_time)
setattr(_THREADLOCAL, _NETWORK_CONTEXT_KEY, network_context)
try:
yield network_context
finally:
delattr(_THREADLOCAL, _NETWORK_CONTEXT_KEY)
del network_context
# yield response
response = next(generator) # pylint: disable=stop-iteration-return
if isinstance(response, Exception):
raise response
response._generator = generator # pylint: disable=protected-access
response.close = MethodType(_close_response_method, response)
def provide_networkcontext(
network_name: Optional[str] = None, timeout: Optional[float] = None, start_time: Optional[float] = None
):
"""Set the NetworkContext, then call the wrapped function using searx.network.context.NetworkContext.call
return response, generator
The timeout is for the whole function and is infinite by default (None).
The timeout is counted from the current time or start_time if different from None
Intended usage: to provide a NetworkContext for scripts in searxng_extra.
Example of usage:
```python
from time import sleep
from searx import network
@network.provide_networkcontext(timeout=3.0)
def main()
sleep(1.0)
# the timeout is automatically set to 2.0 (the remaining time for the NetworkContext).
my_ip = network.get("https://ifconfig.me/ip").text
print(my_ip)
if __name__ == '__main__':
main()
```
"""
def func_outer(func: Callable[P, R]):
@wraps(func)
def func_inner(*args: P.args, **kwargs: P.kwargs) -> R:
with networkcontext_for_thread(network_name, timeout, start_time) as network_context:
return network_context.call(func, *args, **kwargs)
return func_inner
return func_outer
def request(
method: str,
url: str,
params: Optional[httpx._types.QueryParamTypes] = None,
content: Optional[httpx._types.RequestContent] = None,
data: Optional[httpx._types.RequestData] = None,
files: Optional[httpx._types.RequestFiles] = None,
json: Optional[Any] = None,
headers: Optional[httpx._types.HeaderTypes] = None,
cookies: Optional[httpx._types.CookieTypes] = None,
auth: Optional[httpx._types.AuthTypes] = None,
timeout: httpx._types.TimeoutTypes = None,
allow_redirects: bool = False,
max_redirects: Union[_NotSetClass, int] = NOTSET,
verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET,
raise_for_httperror: bool = False,
) -> httpx.Response:
"""Similar to httpx.request ( https://www.python-httpx.org/api/ ) with some differences:
* proxies:
it is not available and has to be defined in the Network configuration (in settings.yml)
* cert:
it is not available and is always None.
* trust_env:
it is not available and is always True.
* timeout:
the implementation uses the lowest timeout between this parameter and remaining time for the NetworkContext.
* allow_redirects:
it replaces the follow_redirects parameter to be compatible with the requests API.
* raise_for_httperror:
when True, this function calls searx.network.raise_for_httperror.raise_for_httperror.
Some parameters from httpx.Client ( https://www.python-httpx.org/api/#client) are available:
* max_redirects:
Set to None to use the value from the Network configuration.
The maximum number of redirect responses that should be followed.
* verify:
Set to None to use the value from the Network configuration.
* limits:
it has to be defined in the Network configuration (in settings.yml)
* default_encoding:
this parameter is not available and is always "utf-8".
This function requires a NetworkContext provided by either provide_networkcontext or networkcontext_for_thread.
The implementation uses one or more httpx.Client
"""
# pylint: disable=too-many-arguments
network_context: Optional[NetworkContext] = getattr(_THREADLOCAL, _NETWORK_CONTEXT_KEY, None)
if network_context is None:
raise NetworkContextNotFound()
http_client = network_context._get_http_client() # pylint: disable=protected-access
return http_client.request(
method,
url,
params=params,
content=content,
data=data,
files=files,
json=json,
headers=headers,
cookies=cookies,
auth=auth,
timeout=timeout,
allow_redirects=allow_redirects,
max_redirects=max_redirects,
verify=verify,
raise_for_httperror=raise_for_httperror,
)
def get(
url: str,
params: Optional[httpx._types.QueryParamTypes] = None,
headers: Optional[httpx._types.HeaderTypes] = None,
cookies: Optional[httpx._types.CookieTypes] = None,
auth: Optional[httpx._types.AuthTypes] = None,
allow_redirects: bool = True,
max_redirects: Union[_NotSetClass, int] = NOTSET,
verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET,
timeout: httpx._types.TimeoutTypes = None,
raise_for_httperror: bool = False,
) -> httpx.Response:
"""Similar to httpx.get, see the request method for the details.
allow_redirects is by default True (httpx default value is False).
"""
# pylint: disable=too-many-arguments
return request(
"GET",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
max_redirects=max_redirects,
verify=verify,
timeout=timeout,
raise_for_httperror=raise_for_httperror,
)
def options(
url: str,
params: Optional[httpx._types.QueryParamTypes] = None,
headers: Optional[httpx._types.HeaderTypes] = None,
cookies: Optional[httpx._types.CookieTypes] = None,
auth: Optional[httpx._types.AuthTypes] = None,
allow_redirects: bool = False,
max_redirects: Union[_NotSetClass, int] = NOTSET,
verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET,
timeout: httpx._types.TimeoutTypes = None,
raise_for_httperror: bool = False,
) -> httpx.Response:
"""Similar to httpx.options, see the request method for the details."""
# pylint: disable=too-many-arguments
return request(
"OPTIONS",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
max_redirects=max_redirects,
verify=verify,
timeout=timeout,
raise_for_httperror=raise_for_httperror,
)
def head(
url: str,
params: Optional[httpx._types.QueryParamTypes] = None,
headers: Optional[httpx._types.HeaderTypes] = None,
cookies: Optional[httpx._types.CookieTypes] = None,
auth: Optional[httpx._types.AuthTypes] = None,
allow_redirects: bool = False,
max_redirects: Union[_NotSetClass, int] = NOTSET,
verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET,
timeout: httpx._types.TimeoutTypes = None,
raise_for_httperror: bool = False,
) -> httpx.Response:
"""Similar to httpx.head, see the request method for the details."""
# pylint: disable=too-many-arguments
return request(
"HEAD",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
max_redirects=max_redirects,
verify=verify,
timeout=timeout,
raise_for_httperror=raise_for_httperror,
)
def post(
url: str,
content: Optional[httpx._types.RequestContent] = None,
data: Optional[httpx._types.RequestData] = None,
files: Optional[httpx._types.RequestFiles] = None,
json: Optional[Any] = None,
params: Optional[httpx._types.QueryParamTypes] = None,
headers: Optional[httpx._types.HeaderTypes] = None,
cookies: Optional[httpx._types.CookieTypes] = None,
auth: Optional[httpx._types.AuthTypes] = None,
allow_redirects: bool = False,
max_redirects: Union[_NotSetClass, int] = NOTSET,
verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET,
timeout: httpx._types.TimeoutTypes = None,
raise_for_httperror: bool = False,
) -> httpx.Response:
"""Similar to httpx.post, see the request method for the details."""
# pylint: disable=too-many-arguments
return request(
"POST",
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
max_redirects=max_redirects,
verify=verify,
timeout=timeout,
raise_for_httperror=raise_for_httperror,
)
def put(
url: str,
content: Optional[httpx._types.RequestContent] = None,
data: Optional[httpx._types.RequestData] = None,
files: Optional[httpx._types.RequestFiles] = None,
json: Optional[Any] = None,
params: Optional[httpx._types.QueryParamTypes] = None,
headers: Optional[httpx._types.HeaderTypes] = None,
cookies: Optional[httpx._types.CookieTypes] = None,
auth: Optional[httpx._types.AuthTypes] = None,
allow_redirects: bool = False,
max_redirects: Union[_NotSetClass, int] = NOTSET,
verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET,
timeout: httpx._types.TimeoutTypes = None,
raise_for_httperror: bool = False,
) -> httpx.Response:
"""Similar to httpx.put, see the request method for the details."""
# pylint: disable=too-many-arguments
return request(
"PUT",
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
max_redirects=max_redirects,
verify=verify,
timeout=timeout,
raise_for_httperror=raise_for_httperror,
)
def patch(
url: str,
content: Optional[httpx._types.RequestContent] = None,
data: Optional[httpx._types.RequestData] = None,
files: Optional[httpx._types.RequestFiles] = None,
json: Optional[Any] = None,
params: Optional[httpx._types.QueryParamTypes] = None,
headers: Optional[httpx._types.HeaderTypes] = None,
cookies: Optional[httpx._types.CookieTypes] = None,
auth: Optional[httpx._types.AuthTypes] = None,
allow_redirects: bool = False,
max_redirects: Union[_NotSetClass, int] = NOTSET,
verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET,
timeout: httpx._types.TimeoutTypes = None,
raise_for_httperror: bool = False,
) -> httpx.Response:
"""Similar to httpx.patch, see the request method for the details."""
# pylint: disable=too-many-arguments
return request(
"PATCH",
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
max_redirects=max_redirects,
verify=verify,
timeout=timeout,
raise_for_httperror=raise_for_httperror,
)
def delete(
url: str,
params: Optional[httpx._types.QueryParamTypes] = None,
headers: Optional[httpx._types.HeaderTypes] = None,
cookies: Optional[httpx._types.CookieTypes] = None,
auth: Optional[httpx._types.AuthTypes] = None,
allow_redirects: bool = False,
max_redirects: Union[_NotSetClass, int] = NOTSET,
verify: Union[_NotSetClass, httpx._types.VerifyTypes] = NOTSET,
timeout: httpx._types.TimeoutTypes = None,
raise_for_httperror: bool = False,
) -> httpx.Response:
"""Similar to httpx.delete, see the request method for the details."""
# pylint: disable=too-many-arguments
return request(
"DELETE",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
max_redirects=max_redirects,
verify=verify,
timeout=timeout,
raise_for_httperror=raise_for_httperror,
)

View File

@ -1,35 +1,65 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=missing-module-docstring, global-statement
# pyright: basic
"""Implement various ABCHTTPClient
* OneHTTPClient wrapper around httpx.Client
* BaseHTTPClient httpx.Client accept the verify and max_redirects parameter only in the constructor.
BaseHTTPClient allows to pass these parameter in each query by creating multiple OneHTTPClient.
* HTTPClient Inherit from BaseHTTPClient, raise an error according to retry_on_http_error parameter.
* TorHTTPClient Inherit from HTTPClientSoftError, check Tor connectivity
"""
import asyncio
import logging
import random
from abc import ABC, abstractmethod
from collections import namedtuple
from ssl import SSLContext
import threading
from typing import Any, Dict
from typing import Any, Dict, Optional, Tuple, Union
import httpx
from httpx_socks import AsyncProxyTransport
from python_socks import parse_proxy_url, ProxyConnectionError, ProxyTimeoutError, ProxyError
from httpx_socks import SyncProxyTransport
from python_socks import ProxyConnectionError, ProxyError, ProxyTimeoutError, parse_proxy_url
from searx import logger
from .raise_for_httperror import raise_for_httperror
# Optional uvloop (support Python 3.6)
try:
import uvloop
except ImportError:
pass
else:
uvloop.install()
CertTypes = Union[
# certfile
str,
# (certfile, keyfile)
Tuple[str, Optional[str]],
# (certfile, keyfile, password)
Tuple[str, Optional[str], Optional[str]],
]
logger = logger.getChild('searx.network.client')
LOOP = None
SSLCONTEXTS: Dict[Any, SSLContext] = {}
def shuffle_ciphers(ssl_context):
class _NotSetClass: # pylint: disable=too-few-public-methods
"""Internal class for this module, do not create instance of this class.
Replace the None value, allow explicitly pass None as a function argument"""
NOTSET = _NotSetClass()
class SoftRetryHTTPException(Exception):
"""Client implementations raise this exception to tell the NetworkContext
the response is invalid even if there is no HTTP exception.
This exception is INTERNAL to searx.network and must not be seen outside.
See HTTPClientSoftError which check the HTTP response according to
the raise_for_httperror parameter.
"""
def __init__(self, response):
self.response = response
message = "SoftRetryHTTPException, you should not see this error"
super().__init__(message)
def _shuffle_ciphers(ssl_context):
"""Shuffle httpx's default ciphers of a SSL context randomly.
From `What Is TLS Fingerprint and How to Bypass It`_
@ -52,18 +82,28 @@ def shuffle_ciphers(ssl_context):
ssl_context.set_ciphers(":".join(sc_list + c_list))
def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
key = (proxy_url, cert, verify, trust_env, http2)
def _get_sslcontexts(
local_address: str,
proxy_url: Optional[str],
cert: Optional[CertTypes],
verify: Union[str, bool],
trust_env: bool,
http2: bool,
):
key = (local_address, proxy_url, cert, verify, trust_env, http2)
if key not in SSLCONTEXTS:
SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)
shuffle_ciphers(SSLCONTEXTS[key])
_shuffle_ciphers(SSLCONTEXTS[key])
return SSLCONTEXTS[key]
class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport):
### Transport
class _HTTPTransportNoHttp(httpx.HTTPTransport):
"""Block HTTP request
The constructor is blank because httpx.AsyncHTTPTransport.__init__ creates an SSLContext unconditionally:
The constructor is blank because httpx.HTTPTransport.__init__ creates an SSLContext unconditionally:
https://github.com/encode/httpx/blob/0f61aa58d66680c239ce43c8cdd453e7dc532bfc/httpx/_transports/default.py#L271
Each SSLContext consumes more than 500kb of memory, since there is about one network per engine.
@ -78,33 +118,29 @@ class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport):
# this on purpose if the base class is not called
pass
async def handle_async_request(self, request):
def handle_request(self, request):
raise httpx.UnsupportedProtocol('HTTP protocol is disabled')
async def aclose(self) -> None:
def close(self) -> None:
pass
async def __aenter__(self):
def __enter__(self): # Use generics for subclass support.
return self
async def __aexit__(
self,
exc_type=None,
exc_value=None,
traceback=None,
) -> None:
def __exit__(self, exc_type, exc_value, traceback) -> None: # pylint: disable=signature-differs
# avoid to import the various type for the signature, but pylint is not happy
pass
class AsyncProxyTransportFixed(AsyncProxyTransport):
"""Fix httpx_socks.AsyncProxyTransport
class _CustomSyncProxyTransport(SyncProxyTransport):
"""Inherit from httpx_socks.SyncProxyTransport
Map python_socks exceptions to httpx.ProxyError exceptions
"""
async def handle_async_request(self, request):
def handle_request(self, request):
try:
return await super().handle_async_request(request)
return super().handle_request(request)
except ProxyConnectionError as e:
raise httpx.ProxyError("ProxyConnectionError: " + e.strerror, request=request) from e
except ProxyTimeoutError as e:
@ -113,7 +149,7 @@ class AsyncProxyTransportFixed(AsyncProxyTransport):
raise httpx.ProxyError("ProxyError: " + e.args[0], request=request) from e
def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
def _get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
# support socks5h (requests compatibility):
# https://requests.readthedocs.io/en/master/user/advanced/#socks
# socks5:// hostname is resolved on client side
@ -125,16 +161,17 @@ def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit
rdns = True
proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
verify = get_sslcontexts(proxy_url, None, verify, True, http2) if verify is True else verify
return AsyncProxyTransportFixed(
verify = _get_sslcontexts(local_address, proxy_url, None, verify, True, http2) if verify is True else verify
# About verify: in ProxyTransportFixed, verify is of type httpx._types.VerifyTypes
return _CustomSyncProxyTransport(
proxy_type=proxy_type,
proxy_host=proxy_host,
proxy_port=proxy_port,
username=proxy_username,
password=proxy_password,
rdns=rdns,
loop=get_loop(),
verify=verify,
verify=verify, # type: ignore
http2=http2,
local_address=local_address,
limits=limit,
@ -142,9 +179,9 @@ def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit
)
def get_transport(verify, http2, local_address, proxy_url, limit, retries):
verify = get_sslcontexts(None, None, verify, True, http2) if verify is True else verify
return httpx.AsyncHTTPTransport(
def _get_transport(verify, http2, local_address, proxy_url, limit, retries):
verify = _get_sslcontexts(local_address, None, None, verify, True, http2) if verify is True else verify
return httpx.HTTPTransport(
# pylint: disable=protected-access
verify=verify,
http2=http2,
@ -155,56 +192,378 @@ def get_transport(verify, http2, local_address, proxy_url, limit, retries):
)
def new_client(
### Clients
class ABCHTTPClient(ABC):
"""Abstract HTTP client
Multiple implementation are defined bellow.
There are like an onion: each implementation relies on the previous one
and bring new feature.
"""
@abstractmethod
def send(self, stream: bool, method: str, url: str, **kwargs) -> httpx.Response:
pass
@abstractmethod
def close(self):
pass
@property
@abstractmethod
def is_closed(self) -> bool:
pass
def request(self, method, url, **kwargs) -> httpx.Response:
return self.send(False, method, url, **kwargs)
def stream(self, method, url, **kwargs) -> httpx.Response:
return self.send(True, method, url, **kwargs)
class OneHTTPClient(ABCHTTPClient):
"""Wrap a httpx.Client
Use httpx_socks for socks proxies.
Deal with httpx.RemoteProtocolError exception: httpx raises this exception when the
HTTP/2 server disconnect. It is excepted to reconnect.
Related to https://github.com/encode/httpx/issues/1478
Perhaps it can be removed now : TODO check in production.
To be backward compatible with Request:
* In Response, "ok" is set to "not response.is_error()"
See https://www.python-httpx.org/compatibility/#checking-for-success-and-failure-responses
* allow_redirects is accepted
See https://www.python-httpx.org/compatibility/#redirects
"""
def __init__(
# pylint: disable=too-many-arguments
enable_http,
verify,
enable_http2,
max_connections,
max_keepalive_connections,
keepalive_expiry,
proxies,
local_address,
retries,
max_redirects,
hook_log_response,
self,
verify=True,
enable_http=True,
enable_http2=False,
max_connections=None,
max_keepalive_connections=None,
keepalive_expiry=None,
proxies=None,
local_addresses=None,
max_redirects=30,
hook_log_response=None,
log_trace=None,
allow_redirects=True,
logger=None,
):
self.enable_http = enable_http
self.verify = verify
self.enable_http2 = enable_http2
self.max_connections = max_connections
self.max_keepalive_connections = max_keepalive_connections
self.keepalive_expiry = keepalive_expiry
self.proxies = proxies or {}
self.local_address = local_addresses
self.max_redirects = max_redirects
self.hook_log_response = hook_log_response
self.allow_redirects = allow_redirects
self.logger = logger
self.extensions = None
if log_trace:
self.extensions = {"trace": log_trace}
self._new_client()
def send(self, stream, method, url, timeout=None, **kwargs):
self._patch_request(kwargs)
retry = 1
response = None
while retry >= 0: # pragma: no cover
retry -= 1
try:
if stream:
# from https://www.python-httpx.org/async/#streaming-responses
# > For situations when context block usage is not practical,
# > it is possible to enter "manual mode" by sending a Request
# > instance using client.send(..., stream=True).
request = self.client.build_request(
method=method,
url=url,
content=kwargs.get("content"),
data=kwargs.get("data"),
files=kwargs.get("files"),
json=kwargs.get("json"),
params=kwargs.get("params"),
headers=kwargs.get("headers"),
cookies=kwargs.get("cookies"),
timeout=timeout,
extensions=self.extensions,
)
response = self.client.send(
request,
stream=True,
follow_redirects=kwargs.get("follow_redirects", False),
auth=kwargs.get("auth"),
)
else:
response = self.client.request(method, url, extensions=self.extensions, timeout=timeout, **kwargs)
self._patch_response(response)
return response
except httpx.RemoteProtocolError as e:
if retry >= 0:
# the server has closed the connection:
# try again without decreasing the retries variable & with a new HTTP client
self._reconnect_client()
if self.logger:
self.logger.warning('httpx.RemoteProtocolError: the server has disconnected, retrying')
continue
raise e
except (httpx.RequestError, httpx.HTTPStatusError) as e:
raise e
return response # type: ignore
def close(self):
self.client.close()
@property
def is_closed(self) -> bool:
return self.client.is_closed
def _new_client(self):
limit = httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
keepalive_expiry=keepalive_expiry,
max_connections=self.max_connections,
max_keepalive_connections=self.max_keepalive_connections,
keepalive_expiry=self.keepalive_expiry,
)
# See https://www.python-httpx.org/advanced/#routing
mounts = {}
for pattern, proxy_url in proxies.items():
if not enable_http and pattern.startswith('http://'):
for pattern, proxy_url in self.proxies.items():
if not self.enable_http and pattern.startswith('http://'):
continue
if proxy_url.startswith('socks4://') or proxy_url.startswith('socks5://') or proxy_url.startswith('socks5h://'):
mounts[pattern] = get_transport_for_socks_proxy(
verify, enable_http2, local_address, proxy_url, limit, retries
if (
proxy_url.startswith('socks4://')
or proxy_url.startswith('socks5://')
or proxy_url.startswith('socks5h://')
):
mounts[pattern] = _get_transport_for_socks_proxy(
self.verify, self.enable_http2, self.local_address, proxy_url, limit, 0
)
else:
mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
mounts[pattern] = _get_transport(
self.verify, self.enable_http2, self.local_address, proxy_url, limit, 0
)
if not enable_http:
mounts['http://'] = AsyncHTTPTransportNoHttp()
if not self.enable_http:
mounts['http://'] = _HTTPTransportNoHttp()
transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
transport = _get_transport(self.verify, self.enable_http2, self.local_address, None, limit, 0)
event_hooks = None
if hook_log_response:
event_hooks = {'response': [hook_log_response]}
return httpx.AsyncClient(
if self.hook_log_response:
event_hooks = {'response': [self.hook_log_response]}
self.client = httpx.Client(
transport=transport,
mounts=mounts,
max_redirects=max_redirects,
max_redirects=self.max_redirects,
event_hooks=event_hooks,
)
def _reconnect_client(self):
self.client.close()
self._new_client()
def get_loop():
return LOOP
def _patch_request(self, kwargs):
# see https://www.python-httpx.org/compatibility/#redirects
follow_redirects = self.allow_redirects
if 'allow_redirects' in kwargs:
# see https://github.com/encode/httpx/pull/1808
follow_redirects = kwargs.pop('allow_redirects')
kwargs['follow_redirects'] = follow_redirects
def _patch_response(self, response):
if isinstance(response, httpx.Response):
# requests compatibility (response is not streamed)
# see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses
response.ok = not response.is_error # type: ignore
return response
_HTTPMultiClientConf = namedtuple('HTTPMultiClientConf', ['verify', 'max_redirects'])
class BaseHTTPClient(ABCHTTPClient):
"""Some parameter like verify, max_redirects are defined at the client level,
not at the request level.
This class allow to specify these parameters at the request level.
The implementation uses multiple instances of OneHTTPClient
This class does not deal with the retry_on_http_error parameter
"""
def __init__(
self,
**kwargs,
):
# set the default values
self.default = _HTTPMultiClientConf(True, 30)
# extract the values from the HTTPCient constructor
# the line before is mandatory to be able to self._extract_kwargs_clients
# and keep the other arguments
self.default, self.kwargs = self._extract_kwargs_clients(kwargs)
self.clients: Dict[Tuple, OneHTTPClient] = {}
def close(self):
for client in self.clients.values():
client.close()
@property
def is_closed(self) -> bool:
return all(client.is_closed for client in self.clients.values())
def send(self, stream, method, url, timeout=None, **kwargs):
client, kwargs = self._get_client_and_update_kwargs(**kwargs)
return client.send(stream, method, url, timeout, **kwargs)
def _get_client_and_update_kwargs(self, **kwargs) -> Tuple[OneHTTPClient, Dict]:
# extract HTTPMultiClientConf using the parameter in the request
# and fallback to the parameters defined in the constructor
# = the parameters set in the network settings
kwargs_clients, kwargs = self._extract_kwargs_clients(kwargs)
if kwargs_clients not in self.clients:
self.clients[kwargs_clients] = OneHTTPClient(
verify=kwargs_clients.verify,
max_redirects=kwargs_clients.max_redirects,
**self.kwargs,
)
return self.clients[kwargs_clients], kwargs
def _extract_kwargs_clients(self, kwargs) -> Tuple[_HTTPMultiClientConf, Dict]:
# default values
# see https://www.python-httpx.org/compatibility/#ssl-configuration
verify = kwargs.pop('verify', NOTSET)
max_redirects = kwargs.pop('max_redirects', NOTSET)
if verify == NOTSET:
verify = self.default.verify
if max_redirects == NOTSET:
max_redirects = self.default.max_redirects
return _HTTPMultiClientConf(verify, max_redirects), kwargs
class HTTPClient(BaseHTTPClient):
"""Inherit from BaseHTTPClient, raise an exception according to the retry_on_http_error parameter"""
def __init__(self, retry_on_http_error=None, **kwargs):
super().__init__(**kwargs)
self.retry_on_http_error = retry_on_http_error
self._check_configuration()
def _check_configuration(self):
# make sure we can create at least an OneHTTPClient without exception
self._get_client_and_update_kwargs()
def send(self, stream, method, url, timeout=None, **kwargs):
try:
do_raise_for_httperror = self._extract_do_raise_for_httperror(kwargs)
response = super().send(stream, method, url, timeout=timeout, **kwargs)
if do_raise_for_httperror:
raise_for_httperror(response)
if self._is_error_but_retry(response):
raise SoftRetryHTTPException(response)
return response
except (httpx.RequestError, httpx.HTTPStatusError) as e:
raise e
def _is_error_but_retry(self, response):
# pylint: disable=too-many-boolean-expressions
if (
(self.retry_on_http_error is True and 400 <= response.status_code <= 599)
or (isinstance(self.retry_on_http_error, list) and response.status_code in self.retry_on_http_error)
or (isinstance(self.retry_on_http_error, int) and response.status_code == self.retry_on_http_error)
):
return True
return False
@staticmethod
def _extract_do_raise_for_httperror(kwargs):
do_raise_for_httperror = True
if 'raise_for_httperror' in kwargs:
do_raise_for_httperror = kwargs['raise_for_httperror']
del kwargs['raise_for_httperror']
return do_raise_for_httperror
def __repr__(self):
keys_values = " ".join([f"{k}={v!r}" for k, v in self.kwargs.items()])
return f"<{self.__class__.__name__} retry_on_http_error={self.retry_on_http_error!r} {keys_values}>"
class TorHTTPClient(HTTPClient):
"""Extend HTTPClientSoftError client. To use with Tor configuration.
The class checks if the client is really connected through Tor.
"""
_TOR_CHECK_RESULT = {}
def __init__(self, proxies=None, local_addresses=None, **kwargs):
self.proxies = proxies
self.local_addresses = local_addresses
super().__init__(proxies=proxies, local_addresses=local_addresses, **kwargs)
def _check_configuration(self):
if not self._is_connected_through_tor(self.proxies, self.local_addresses):
self.close()
raise httpx.HTTPError('Network configuration problem: not using Tor')
def _is_connected_through_tor(self, proxies, local_addresses) -> bool:
"""TODO : rewrite to check the proxies variable instead of checking the HTTPTransport ?"""
if proxies is None:
return False
cache_key = (local_addresses, tuple(proxies.items()))
if cache_key in TorHTTPClient._TOR_CHECK_RESULT:
return TorHTTPClient._TOR_CHECK_RESULT[cache_key]
# False is the client use the DNS from the proxy
use_local_dns = False
# get one httpx client through get_client_and_update_kwargs
one_http_client, _ = self._get_client_and_update_kwargs(verify=True)
httpx_client = one_http_client.client
# ignore client._transport because it is not used with all://
for transport in httpx_client._mounts.values(): # pylint: disable=protected-access
if isinstance(transport, _HTTPTransportNoHttp):
# ignore the NO HTTP transport
continue
if isinstance(transport, _CustomSyncProxyTransport) and not getattr(
transport._pool, "_rdns", False # pylint: disable=protected-access # type: ignore
):
# socks5:// with local DNS
# expect socks5h:// with remote DNS to resolve .onion domain.
use_local_dns = True
break
#
if use_local_dns:
# no test
result = False
else:
# actual check
response = one_http_client.request("GET", "https://check.torproject.org/api/ip", timeout=60)
if response.status_code != 200:
result = False
else:
result = bool(response.json().get("IsTor", False))
TorHTTPClient._TOR_CHECK_RESULT[cache_key] = result
return result
@staticmethod
def _clear_cache():
"""Only for the tests"""
TorHTTPClient._TOR_CHECK_RESULT = {}
def init():
@ -220,18 +579,5 @@ def init():
):
logging.getLogger(logger_name).setLevel(logging.WARNING)
# loop
def loop_thread():
global LOOP
LOOP = asyncio.new_event_loop()
LOOP.run_forever()
thread = threading.Thread(
target=loop_thread,
name='asyncio_loop',
daemon=True,
)
thread.start()
init()

387
searx/network/context.py Normal file
View File

@ -0,0 +1,387 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pyright: basic
"""This module implements various NetworkContext which deals with
* retry strategies: what to do when an HTTP request fails and retries>0
* record HTTP runtime
* timeout: In engines, the user query starts at one point in time,
the engine timeout is the starting point plus a defined value.
NetworkContext sends HTTP requests following the request timeout and the engine timeouts.
Example of usage:
```
context = NetworkContextRetryFunction(...) # or another implementation
def my_engine():
http_client = context.get_http_client()
ip_ifconfig = http_client.request("GET", "https://ifconfig.me/")
print("ip from ifconfig.me ", ip_ifconfig)
ip_myip = http_client.request("GET", "https://api.myip.com").json()["ip"]
print("ip from api.myip.com", ip_myip)
assert ip_ifconfig == ip_myip
# ^^ always true with NetworkContextRetryFunction and NetworkContextRetrySameHTTPClient
result = context.call(my_engine)
print('HTTP runtime:', context.get_total_time())
```
Note in the code above NetworkContextRetryFunction is instanced directly for the sake of simplicity.
NetworkContext are actually instanciated using Network.get_context(...)
Various implementations define what to do when there is an exception in the function `my_engine`:
* `NetworkContextRetryFunction` gets another HTTP client and tries the whole function again.
* `NetworkContextRetryDifferentHTTPClient` gets another HTTP client and tries the query again.
* `NetworkContextRetrySameHTTPClient` tries the query again with the same HTTP client.
"""
import functools
import ssl
from abc import ABC, abstractmethod
from contextlib import contextmanager
from timeit import default_timer
from typing import Callable, Optional, final
try:
from typing import ParamSpec, TypeVar
except ImportError:
# to support Python < 3.10
from typing_extensions import ParamSpec, TypeVar
import httpx
from searx.network.client import ABCHTTPClient, SoftRetryHTTPException
P = ParamSpec('P')
R = TypeVar('R')
HTTPCLIENTFACTORY = Callable[[], ABCHTTPClient]
DEFAULT_TIMEOUT = 120.0
## NetworkContext
class NetworkContext(ABC):
"""Abstract implementation: the call must defined in concrete classes.
Lifetime: one engine request or initialization of an engine.
"""
__slots__ = ('_retries', '_http_client_factory', '_http_client', '_start_time', '_http_time', '_timeout')
def __init__(
self,
retries: int,
http_client_factory: HTTPCLIENTFACTORY,
start_time: Optional[float],
timeout: Optional[float],
):
self._retries: int = retries
# wrap http_client_factory here, so we can forget about this wrapping
self._http_client_factory = _TimeHTTPClientWrapper.wrap_factory(http_client_factory, self)
self._http_client: Optional[ABCHTTPClient] = None
self._start_time: float = start_time or default_timer()
self._http_time: float = 0.0
self._timeout: Optional[float] = timeout
@abstractmethod
def call(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R:
"""Call func within the network context.
The retry policy might call func multiple times.
Within the function self.get_http_client() returns an HTTP client to use.
The retry policy might send multiple times the same HTTP request
until it works or the retry count falls to zero.
"""
@final
def request(self, *args, **kwargs):
"""Convenient method to wrap a call to request inside the call method.
Use a new HTTP client to wrap a call to the request method using self.call
"""
self._set_http_client()
return self.call(self._get_http_client().request, *args, **kwargs)
@final
def stream(self, *args, **kwargs):
"""Convenient method to wrap a call to stream inside the call method.
Use a new HTTP client to wrap a call to the stream method using self.call
"""
self._set_http_client()
return self.call(self._get_http_client().stream, *args, **kwargs)
@final
def get_http_runtime(self) -> Optional[float]:
"""Return the amount of time spent on HTTP requests"""
return self._http_time
@final
def get_remaining_time(self, _override_timeout: Optional[float] = None) -> float:
"""Return the remaining time for the context.
_override_timeout is not intended to be used outside this module.
"""
timeout = _override_timeout or self._timeout or DEFAULT_TIMEOUT
timeout += 0.2 # overhead
timeout -= default_timer() - self._start_time
return timeout
@final
def _get_http_client(self) -> ABCHTTPClient:
"""Return the HTTP client to use for this context."""
if self._http_client is None:
raise ValueError("HTTP client has not been set")
return self._http_client
@final
def _set_http_client(self):
"""Ask the NetworkContext to use another HTTP client using the factory.
Use the method _get_new_client_from_factory() to call the factory,
so the NetworkContext implementations can wrap the ABCHTTPClient.
"""
self._http_client = self._get_new_client_from_factory()
@final
def _reset_http_client(self):
self._http_client = None
def _get_new_client_from_factory(self):
return self._http_client_factory()
@contextmanager
def _record_http_time(self):
"""This decorator records the code's runtime and adds it to self.total_time"""
time_before_request = default_timer()
try:
yield
finally:
self._http_time += default_timer() - time_before_request
def __repr__(self):
common_attributes = (
f"{self.__class__.__name__}"
+ f" retries={self._retries!r} timeout={self._timeout!r} http_client={self._http_client!r}"
)
# get the original factory : see the __init__ method of this class and _TimeHTTPClientWrapper.wrap_factory
factory = self._http_client_factory.__wrapped__
# break the abstraction safely: get back the Network object through the bound method
# see Network.get_context
bound_instance = getattr(factory, "__self__", None)
if bound_instance is not None and hasattr(bound_instance, 'get_context'):
# bound_instance has a "get_context" attribute: this is most likely a Network
# searx.network.network.Network is not imported to avoid circular import
return f"<{common_attributes} network_context={factory.__self__!r}>"
# fallback : this instance was not created using Network.get_context
return f"<{common_attributes} http_client_factory={factory!r}>"
## Measure time and deal with timeout
class _TimeHTTPClientWrapper(ABCHTTPClient):
"""Wrap an ABCHTTPClient:
* to record the HTTP runtime
* to override the timeout to make sure the total time does not exceed the timeout set on the NetworkContext
"""
__slots__ = ('http_client', 'network_context')
@staticmethod
def wrap_factory(http_client_factory: HTTPCLIENTFACTORY, network_context: NetworkContext):
"""Return a factory which wraps the result of http_client_factory with _TimeHTTPClientWrapper instance."""
functools.wraps(http_client_factory)
def wrapped_factory():
return _TimeHTTPClientWrapper(http_client_factory(), network_context)
wrapped_factory.__wrapped__ = http_client_factory
return wrapped_factory
def __init__(self, http_client: ABCHTTPClient, network_context: NetworkContext) -> None:
self.http_client = http_client
self.network_context = network_context
def send(self, stream, method, url, **kwargs) -> httpx.Response:
"""Send the HTTP request using self.http_client
Inaccurate with stream: the method must record HTTP time with the close method of httpx.Response.
It is not a problem since stream are used only for the image proxy.
"""
with self.network_context._record_http_time(): # pylint: disable=protected-access
timeout = self._extract_timeout(kwargs)
return self.http_client.send(stream, method, url, timeout=timeout, **kwargs)
def close(self):
return self.http_client.close()
@property
def is_closed(self) -> bool:
return self.http_client.is_closed
def _extract_timeout(self, kwargs):
"""Extract the timeout parameter and adjust it according to the remaining time"""
timeout = kwargs.pop('timeout', None)
return self.network_context.get_remaining_time(timeout)
## NetworkContextRetryFunction
class NetworkContextRetryFunction(NetworkContext):
"""When an HTTP request fails, this NetworkContext tries again
the whole function with another HTTP client.
This guarantees that func has the same HTTP client all along.
"""
def call(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R:
try:
while self._retries >= 0 and self.get_remaining_time() > 0:
self._set_http_client()
try:
return func(*args, **kwargs) # type: ignore
except (ssl.SSLError, httpx.RequestError, httpx.HTTPStatusError, SoftRetryHTTPException) as e:
if self._retries <= 0:
# raise the exception only there is no more try
raise e
self._retries -= 1
raise httpx.TimeoutException("Timeout")
finally:
self._reset_http_client()
def _get_new_client_from_factory(self):
return _RetryFunctionHTTPClient(super()._get_new_client_from_factory(), self)
class _RetryFunctionHTTPClient(ABCHTTPClient):
"""Companion class of NetworkContextRetryFunction
Do one thing: if the retries count of the NetworkContext is zero and there is a SoftRetryHTTPException,
then the send method catch this exception and returns the HTTP response.
This make sure the SoftRetryHTTPException exception is not seen outside the searx.network module.
"""
def __init__(self, http_client: ABCHTTPClient, network_context: NetworkContextRetryFunction):
self.http_client = http_client
self.network_context = network_context
def send(self, stream: bool, method: str, url: str, **kwargs) -> httpx.Response:
try:
return self.http_client.send(stream, method, url, **kwargs)
except SoftRetryHTTPException as e:
if self.network_context._retries <= 0: # pylint: disable=protected-access
return e.response
raise e
def close(self):
return self.http_client.close()
@property
def is_closed(self) -> bool:
return self.http_client.is_closed
## NetworkContextRetrySameHTTPClient
class NetworkContextRetrySameHTTPClient(NetworkContext):
"""When an HTTP request fails, this NetworkContext tries again
the same HTTP request with the same HTTP client
The implementation wraps the provided ABCHTTPClient with _RetrySameHTTPClient."""
def call(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R:
try:
self._set_http_client()
return func(*args, **kwargs) # type: ignore
finally:
self._reset_http_client()
def _get_new_client_from_factory(self):
return _RetrySameHTTPClient(super()._get_new_client_from_factory(), self)
class _RetrySameHTTPClient(ABCHTTPClient):
"""Companion class of NetworkContextRetrySameHTTPClient"""
def __init__(self, http_client: ABCHTTPClient, network_content: NetworkContextRetrySameHTTPClient):
self.http_client = http_client
self.network_content = network_content
def send(self, stream: bool, method: str, url: str, **kwargs) -> httpx.Response:
retries = self.network_content._retries # pylint: disable=protected-access
while retries >= 0 and self.network_content.get_remaining_time() > 0:
try:
return self.http_client.send(stream, method, url, **kwargs)
except SoftRetryHTTPException as e:
if retries <= 0:
return e.response
except (ssl.SSLError, httpx.RequestError, httpx.HTTPStatusError) as e:
if retries <= 0:
raise e
retries -= 1
return self.http_client.send(stream, method, url, **kwargs)
def close(self):
return self.http_client.close()
@property
def is_closed(self) -> bool:
return self.http_client.is_closed
## NetworkContextRetryDifferentHTTPClient
class NetworkContextRetryDifferentHTTPClient(NetworkContext):
"""When a HTTP request fails, this NetworkContext tries again
the same HTTP request with a different HTTP client
The implementation wraps the provided ABCHTTPClient with _RetryDifferentHTTPClient."""
def call(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R:
self._set_http_client()
try:
return func(*args, **kwargs) # type: ignore
finally:
self._reset_http_client()
def _get_new_client_from_factory(self):
return _RetryDifferentHTTPClient(self)
class _RetryDifferentHTTPClient(ABCHTTPClient):
"""Companion class of NetworkContextRetryDifferentHTTPClient"""
def __init__(self, network_context: NetworkContextRetryDifferentHTTPClient) -> None:
self.network_context = network_context
def send(self, stream: bool, method: str, url: str, **kwargs) -> httpx.Response:
retries = self.network_context._retries # pylint: disable=protected-access
while retries >= 0 and self.network_context.get_remaining_time() > 0:
http_client = self.network_context._http_client_factory() # pylint: disable=protected-access
try:
return http_client.send(stream, method, url, **kwargs)
except SoftRetryHTTPException as e:
if retries <= 0:
return e.response
except (ssl.SSLError, httpx.RequestError, httpx.HTTPStatusError) as e:
if retries <= 0:
raise e
retries -= 1
http_client = self.network_context._http_client_factory() # pylint: disable=protected-access
return http_client.send(stream, method, url, **kwargs)
def close(self):
raise NotImplementedError()
@property
def is_closed(self) -> bool:
raise NotImplementedError()

View File

@ -1,24 +1,265 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=global-statement
# pylint: disable=missing-module-docstring, missing-class-docstring
# pylint: disable=missing-class-docstring
# pyright: basic
"""Deal with
* create Networks from settings.yml
* each Network contains an ABCHTTPClient for each (proxies, IP addresses). Lazy initialized.
* a Network provides two methods:
* get_http_client: returns an HTTP client. Prefer the get_context,
retry strategy is ignored with get_http_client
* get_context: provides a runtime context for the engine, see searx.network.context
"""
import atexit
import asyncio
import ipaddress
from dataclasses import dataclass, field
from enum import Enum
from itertools import cycle
from typing import Dict
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
import httpx
from searx import logger, searx_debug
from .client import new_client, get_loop, AsyncHTTPTransportNoHttp
from .raise_for_httperror import raise_for_httperror
from searx.network.client import HTTPClient, TorHTTPClient
from searx.network.context import (
NetworkContext,
NetworkContextRetryDifferentHTTPClient,
NetworkContextRetryFunction,
NetworkContextRetrySameHTTPClient,
)
logger = logger.getChild('network')
DEFAULT_NAME = '__DEFAULT__'
NETWORKS: Dict[str, 'Network'] = {}
class RetryStrategy(Enum):
ENGINE = NetworkContextRetryFunction
SAME_HTTP_CLIENT = NetworkContextRetrySameHTTPClient
DIFFERENT_HTTP_CLIENT = NetworkContextRetryDifferentHTTPClient
TYPE_IP_ANY = Union[ # pylint: disable=invalid-name
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network,
]
TYPE_RETRY_ON_ERROR = Union[List[int], int, bool] # pylint: disable=invalid-name
@dataclass(order=True, frozen=True)
class NetworkSettings:
"""Configuration for a Network. See NetworkSettingsReader"""
# Individual HTTP requests can override these parameters.
verify: bool = True
max_redirects: int = 30
# These parameters can not be overridden.
enable_http: bool = False
enable_http2: bool = True
max_connections: Optional[int] = 10
max_keepalive_connections: Optional[int] = 100
keepalive_expiry: Optional[float] = 5.0
local_addresses: List[TYPE_IP_ANY] = field(default_factory=list)
proxies: Dict[str, List[str]] = field(default_factory=dict)
using_tor_proxy: bool = False
retries: int = 0
retry_strategy: RetryStrategy = RetryStrategy.DIFFERENT_HTTP_CLIENT
retry_on_http_error: Optional[TYPE_RETRY_ON_ERROR] = None
logger_name: Optional[str] = None
class Network:
"""Provides NetworkContext and ABCHTTPClient following NetworkSettings.
A Network might have multiple IP addresses and proxies;
in this case, each call to get_context or get_http_client provides a different
configuration.
"""
__slots__ = (
'_settings',
'_local_addresses_cycle',
'_proxies_cycle',
'_clients',
'_logger',
)
def __init__(self, settings: NetworkSettings):
"""Creates a Network from a NetworkSettings"""
self._settings = settings
self._local_addresses_cycle = self._get_local_addresses_cycle()
self._proxies_cycle = self._get_proxy_cycles()
self._clients: Dict[Tuple, HTTPClient] = {}
self._logger = logger.getChild(settings.logger_name) if settings.logger_name else logger
@staticmethod
def from_dict(**kwargs):
"""Creates a Network from a keys/values"""
return Network(NetwortSettingsDecoder.from_dict(kwargs))
def close(self):
"""Close all the ABCHTTPClient hold by the Network"""
for client in self._clients.values():
client.close()
def check_configuration(self) -> bool:
"""Check if the network configuration is valid.
Typical use case: check if the proxy is really a Tor proxy"""
try:
self._get_http_client()
return True
except Exception: # pylint: disable=broad-except
self._logger.exception('Error')
return False
def get_context(self, timeout: Optional[float] = None, start_time: Optional[float] = None) -> NetworkContext:
"""Return a new NetworkContext"""
context_cls = self._settings.retry_strategy.value
return context_cls(self._settings.retries, self._get_http_client, start_time, timeout)
def _get_http_client(self) -> HTTPClient:
"""Return an HTTP client.
Different HTTP clients are returned according to the configuration.
For example, if two proxies are defined,
the first call to this function returns an HTTP client using the first proxy.
A second call returns an HTTP client using the second proxy.
A third call returns the same HTTP client from the first call, using the first proxy.
"""
local_addresses = next(self._local_addresses_cycle)
proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key
key = (local_addresses, proxies)
if key not in self._clients or self._clients[key].is_closed:
http_client_cls = TorHTTPClient if self._settings.using_tor_proxy else HTTPClient
hook_log_response = self._log_response if searx_debug else None
log_trace = self._log_trace if searx_debug else None
self._clients[key] = http_client_cls(
verify=self._settings.verify,
enable_http=self._settings.enable_http,
enable_http2=self._settings.enable_http2,
max_connections=self._settings.max_connections,
max_keepalive_connections=self._settings.max_keepalive_connections,
keepalive_expiry=self._settings.keepalive_expiry,
proxies=dict(proxies),
local_addresses=local_addresses,
retry_on_http_error=self._settings.retry_on_http_error,
hook_log_response=hook_log_response,
log_trace=log_trace,
logger=self._logger,
)
return self._clients[key]
def _get_local_addresses_cycle(self):
"""Never-ending generator of IP addresses"""
while True:
at_least_one = False
for address in self._settings.local_addresses:
if isinstance(address, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
for a in address.hosts():
yield str(a)
at_least_one = True
else:
yield str(address)
at_least_one = True
if not at_least_one:
# IPv4Network.hosts() and IPv6Network.hosts() might never return an IP address.
# at_least_one makes sure the generator does not turn into infinite loop without yield
yield None
def _get_proxy_cycles(self):
"""Never-ending generator of proxy configurations.
Each iteration returns tuples of tuples.
Semantically, this is a dictionary where
* keys are the mount points (see https://www.python-httpx.org/advanced/#mounting-transports )
* values are the proxy URLs.
This private method returns a tuple instead of a dictionary to be hashable.
See the line `key = (local_addresses, proxies)` above.
For example, if settings.yml contains:
```yaml
proxies: socks5h://localhost:1337
```
This is equivalent to
```yaml
proxies:
- all://: socks5h://localhost:1337
```
And this method always returns:
* `(('all://', 'socks5h://localhost:1337'))`
Another example:
```yaml
proxies:
- all://: socks5h://localhost:1337
- https://bing.com:
- socks5h://localhost:4000
- socks5h://localhost:5000
```
In this example, this method alternately returns these two responses:
* `(('all://', 'socks5h://localhost:1337'), ('https://bing.com', 'socks5h://localhost:4000'))`
* `(('all://', 'socks5h://localhost:1337'), ('https://bing.com', 'socks5h://localhost:5000'))`
When no proxies are configured, this method returns an empty tuple at each iteration.
"""
# for each pattern, turn each list of proxy into a cycle
proxy_settings = {pattern: cycle(proxy_urls) for pattern, proxy_urls in (self._settings.proxies).items()}
while True:
# pylint: disable=stop-iteration-return
# ^^ is it a pylint bug ?
yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items())
def _log_response(self, response: httpx.Response):
"""Logs from httpx are disabled. Log the HTTP response with the logger from the network"""
request = response.request
status = f"{response.status_code} {response.reason_phrase}"
response_line = f"{response.http_version} {status}"
content_type = response.headers.get("Content-Type")
content_type = f' ({content_type})' if content_type else ''
self._logger.debug(f'HTTP Request: {request.method} {request.url} "{response_line}"{content_type}')
def _log_trace(self, name: str, info: Mapping[str, Any]) -> None:
"""Log the actual source / dest IPs and SSL cipher.
Note: does not work with socks proxy
See
* https://www.encode.io/httpcore/extensions/
* https://github.com/encode/httpx/blob/e874351f04471029b2c5dcb2d0b50baccc7b9bc0/httpx/_main.py#L207
"""
if name == "connection.connect_tcp.complete":
stream = info["return_value"]
server_addr = stream.get_extra_info("server_addr")
client_addr = stream.get_extra_info("client_addr")
self._logger.debug(f"* Connected from {client_addr[0]!r} to {server_addr[0]!r} on port {server_addr[1]}")
elif name == "connection.start_tls.complete": # pragma: no cover
stream = info["return_value"]
ssl_object = stream.get_extra_info("ssl_object")
version = ssl_object.version()
cipher = ssl_object.cipher()
alpn = ssl_object.selected_alpn_protocol()
self._logger.debug(f"* SSL established using {version!r} / {cipher[0]!r}, ALPN protocol: {alpn!r}")
elif name == "http2.send_request_headers.started":
self._logger.debug(f"* HTTP/2 stream_id: {info['stream_id']}")
def __repr__(self):
return f"<{self.__class__.__name__} logger_name={self._settings.logger_name!r}>"
class NetwortSettingsDecoder:
"""Convert a description of a network in settings.yml to a NetworkSettings instance"""
# requests compatibility when reading proxy settings from settings.yml
PROXY_PATTERN_MAPPING = {
'http': 'http://',
@ -33,395 +274,228 @@ PROXY_PATTERN_MAPPING = {
'socks5h:': 'socks5h://',
}
ADDRESS_MAPPING = {'ipv4': '0.0.0.0', 'ipv6': '::'}
class Network:
__slots__ = (
'enable_http',
'verify',
'enable_http2',
'max_connections',
'max_keepalive_connections',
'keepalive_expiry',
'local_addresses',
'proxies',
'using_tor_proxy',
'max_redirects',
'retries',
'retry_on_http_error',
'_local_addresses_cycle',
'_proxies_cycle',
'_clients',
'_logger',
)
_TOR_CHECK_RESULT = {}
def __init__(
# pylint: disable=too-many-arguments
self,
enable_http=True,
verify=True,
enable_http2=False,
max_connections=None,
max_keepalive_connections=None,
keepalive_expiry=None,
proxies=None,
using_tor_proxy=False,
local_addresses=None,
retries=0,
retry_on_http_error=None,
max_redirects=30,
logger_name=None,
):
self.enable_http = enable_http
self.verify = verify
self.enable_http2 = enable_http2
self.max_connections = max_connections
self.max_keepalive_connections = max_keepalive_connections
self.keepalive_expiry = keepalive_expiry
self.proxies = proxies
self.using_tor_proxy = using_tor_proxy
self.local_addresses = local_addresses
self.retries = retries
self.retry_on_http_error = retry_on_http_error
self.max_redirects = max_redirects
self._local_addresses_cycle = self.get_ipaddress_cycle()
self._proxies_cycle = self.get_proxy_cycles()
self._clients = {}
self._logger = logger.getChild(logger_name) if logger_name else logger
self.check_parameters()
def check_parameters(self):
for address in self.iter_ipaddresses():
if '/' in address:
ipaddress.ip_network(address, False)
else:
ipaddress.ip_address(address)
if self.proxies is not None and not isinstance(self.proxies, (str, dict)):
raise ValueError('proxies type has to be str, dict or None')
def iter_ipaddresses(self):
local_addresses = self.local_addresses
if not local_addresses:
return
if isinstance(local_addresses, str):
local_addresses = [local_addresses]
for address in local_addresses:
yield address
def get_ipaddress_cycle(self):
while True:
count = 0
for address in self.iter_ipaddresses():
if '/' in address:
for a in ipaddress.ip_network(address, False).hosts():
yield str(a)
count += 1
else:
a = ipaddress.ip_address(address)
yield str(a)
count += 1
if count == 0:
yield None
def iter_proxies(self):
if not self.proxies:
return
# https://www.python-httpx.org/compatibility/#proxy-keys
if isinstance(self.proxies, str):
yield 'all://', [self.proxies]
else:
for pattern, proxy_url in self.proxies.items():
pattern = PROXY_PATTERN_MAPPING.get(pattern, pattern)
if isinstance(proxy_url, str):
proxy_url = [proxy_url]
yield pattern, proxy_url
def get_proxy_cycles(self):
proxy_settings = {}
for pattern, proxy_urls in self.iter_proxies():
proxy_settings[pattern] = cycle(proxy_urls)
while True:
# pylint: disable=stop-iteration-return
yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items())
async def log_response(self, response: httpx.Response):
request = response.request
status = f"{response.status_code} {response.reason_phrase}"
response_line = f"{response.http_version} {status}"
content_type = response.headers.get("Content-Type")
content_type = f' ({content_type})' if content_type else ''
self._logger.debug(f'HTTP Request: {request.method} {request.url} "{response_line}"{content_type}')
@staticmethod
async def check_tor_proxy(client: httpx.AsyncClient, proxies) -> bool:
if proxies in Network._TOR_CHECK_RESULT:
return Network._TOR_CHECK_RESULT[proxies]
result = True
# ignore client._transport because it is not used with all://
for transport in client._mounts.values(): # pylint: disable=protected-access
if isinstance(transport, AsyncHTTPTransportNoHttp):
@classmethod
def from_dict(cls, network_settings: Dict[str, Any]) -> NetworkSettings:
# Decode the parameters that require it; the other parameters are left as they are
decoders = {
"proxies": cls._decode_proxies,
"local_addresses": cls._decode_local_addresses,
"retry_strategy": cls._decode_retry_strategy,
}
for key, decode_func in decoders.items():
if key not in network_settings:
continue
if getattr(transport, "_pool") and getattr(
transport._pool, "_rdns", False # pylint: disable=protected-access
):
continue
return False
response = await client.get("https://check.torproject.org/api/ip", timeout=60)
if not response.json()["IsTor"]:
result = False
Network._TOR_CHECK_RESULT[proxies] = result
return result
async def get_client(self, verify=None, max_redirects=None):
verify = self.verify if verify is None else verify
max_redirects = self.max_redirects if max_redirects is None else max_redirects
local_address = next(self._local_addresses_cycle)
proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key
key = (verify, max_redirects, local_address, proxies)
hook_log_response = self.log_response if searx_debug else None
if key not in self._clients or self._clients[key].is_closed:
client = new_client(
self.enable_http,
verify,
self.enable_http2,
self.max_connections,
self.max_keepalive_connections,
self.keepalive_expiry,
dict(proxies),
local_address,
0,
max_redirects,
hook_log_response,
)
if self.using_tor_proxy and not await self.check_tor_proxy(client, proxies):
await client.aclose()
raise httpx.ProxyError('Network configuration problem: not using Tor')
self._clients[key] = client
return self._clients[key]
async def aclose(self):
async def close_client(client):
try:
await client.aclose()
except httpx.HTTPError:
pass
await asyncio.gather(*[close_client(client) for client in self._clients.values()], return_exceptions=False)
@staticmethod
def extract_kwargs_clients(kwargs):
kwargs_clients = {}
if 'verify' in kwargs:
kwargs_clients['verify'] = kwargs.pop('verify')
if 'max_redirects' in kwargs:
kwargs_clients['max_redirects'] = kwargs.pop('max_redirects')
if 'allow_redirects' in kwargs:
# see https://github.com/encode/httpx/pull/1808
kwargs['follow_redirects'] = kwargs.pop('allow_redirects')
return kwargs_clients
@staticmethod
def extract_do_raise_for_httperror(kwargs):
do_raise_for_httperror = True
if 'raise_for_httperror' in kwargs:
do_raise_for_httperror = kwargs['raise_for_httperror']
del kwargs['raise_for_httperror']
return do_raise_for_httperror
@staticmethod
def patch_response(response, do_raise_for_httperror):
if isinstance(response, httpx.Response):
# requests compatibility (response is not streamed)
# see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses
response.ok = not response.is_error
# raise an exception
if do_raise_for_httperror:
raise_for_httperror(response)
return response
def is_valid_response(self, response):
# pylint: disable=too-many-boolean-expressions
if (
(self.retry_on_http_error is True and 400 <= response.status_code <= 599)
or (isinstance(self.retry_on_http_error, list) and response.status_code in self.retry_on_http_error)
or (isinstance(self.retry_on_http_error, int) and response.status_code == self.retry_on_http_error)
):
return False
return True
async def call_client(self, stream, method, url, **kwargs):
retries = self.retries
was_disconnected = False
do_raise_for_httperror = Network.extract_do_raise_for_httperror(kwargs)
kwargs_clients = Network.extract_kwargs_clients(kwargs)
while retries >= 0: # pragma: no cover
client = await self.get_client(**kwargs_clients)
try:
if stream:
response = client.stream(method, url, **kwargs)
if network_settings[key] is None:
# None is seen as not set: rely on the default values from NetworkSettings
del network_settings[key]
else:
response = await client.request(method, url, **kwargs)
if self.is_valid_response(response) or retries <= 0:
return Network.patch_response(response, do_raise_for_httperror)
except httpx.RemoteProtocolError as e:
if not was_disconnected:
# the server has closed the connection:
# try again without decreasing the retries variable & with a new HTTP client
was_disconnected = True
await client.aclose()
self._logger.warning('httpx.RemoteProtocolError: the server has disconnected, retrying')
continue
if retries <= 0:
raise e
except (httpx.RequestError, httpx.HTTPStatusError) as e:
if retries <= 0:
raise e
retries -= 1
async def request(self, method, url, **kwargs):
return await self.call_client(False, method, url, **kwargs)
async def stream(self, method, url, **kwargs):
return await self.call_client(True, method, url, **kwargs)
network_settings[key] = decode_func(network_settings[key])
# Relies on the default values of NetworkSettings for unset parameters
return NetworkSettings(**network_settings)
@classmethod
async def aclose_all(cls):
await asyncio.gather(*[network.aclose() for network in NETWORKS.values()], return_exceptions=False)
def _decode_proxies(cls, proxies) -> Dict[str, List[str]]:
if isinstance(proxies, str):
# for example:
# proxies: socks5://localhost:8000
proxies = {'all://': [proxies]}
elif isinstance(proxies, list):
# for example:
# proxies:
# - socks5h://localhost:8000
# - socks5h://localhost:8001
proxies = {'all://': proxies}
if not isinstance(proxies, dict):
raise ValueError('proxies type has to be str, list, dict or None')
# Here we are sure to have
# proxies = {
# pattern: a_value
# }
# with a_value that can be either a string or a list.
# Now, we make sure that a_value is always a list of strings.
# Also, we keep compatibility with requests regarding the patterns:
# see https://www.python-httpx.org/compatibility/#proxy-keys
result = {}
for pattern, proxy_list in proxies.items():
pattern = cls.PROXY_PATTERN_MAPPING.get(pattern, pattern)
if isinstance(proxy_list, str):
proxy_list = [proxy_list]
if not isinstance(proxy_list, list):
raise ValueError('proxy list')
for proxy in proxy_list:
if not isinstance(proxy, str):
raise ValueError(f'{repr(proxy)} : an URL is expected')
result[pattern] = proxy_list
return result
@staticmethod
def _decode_local_addresses(ip_addresses: Union[str, List[str]]) -> List[TYPE_IP_ANY]:
if isinstance(ip_addresses, str):
ip_addresses = [ip_addresses]
if not isinstance(ip_addresses, list):
raise ValueError('IP address must be either None or a string or a list of strings')
# check IP address syntax
result = []
for address in ip_addresses:
if not isinstance(address, str):
raise ValueError(f'An {address!r} must be an IP address written as a string')
if '/' in address:
result.append(ipaddress.ip_network(address, False))
else:
result.append(ipaddress.ip_address(address))
return result
@staticmethod
def _decode_retry_strategy(retry_strategy: str) -> RetryStrategy:
for member in RetryStrategy:
if member.name.lower() == retry_strategy.lower():
return member
raise ValueError(f"{retry_strategy} is not a RetryStrategy")
def get_network(name=None):
return NETWORKS.get(name or DEFAULT_NAME)
class NetworkManager:
"""Contains all the Network instances.
By default, there is one default network, so searx.network.
"""
def check_network_configuration():
async def check():
exception_count = 0
for network in NETWORKS.values():
if network.using_tor_proxy:
try:
await network.get_client()
except Exception: # pylint: disable=broad-except
network._logger.exception('Error') # pylint: disable=protected-access
exception_count += 1
return exception_count
DEFAULT_NAME = '__DEFAULT__'
future = asyncio.run_coroutine_threadsafe(check(), get_loop())
exception_count = future.result()
if exception_count > 0:
raise RuntimeError("Invalid network configuration")
def __init__(self):
# Create a default network so scripts in searxng_extra don't have load settings.yml
self.networks: Dict[str, Network] = {NetworkManager.DEFAULT_NAME: Network.from_dict()}
def get(self, name: Optional[str] = None):
return self.networks[name or NetworkManager.DEFAULT_NAME]
def initialize(settings_engines=None, settings_outgoing=None):
# pylint: disable=import-outside-toplevel)
from searx.engines import engines
from searx import settings
def initialize_from_settings(self, settings_engines, settings_outgoing, check=True):
# pylint: disable=too-many-branches
from searx.engines import engines # pylint: disable=import-outside-toplevel
# pylint: enable=import-outside-toplevel)
settings_engines = settings_engines or settings['engines']
settings_outgoing = settings_outgoing or settings['outgoing']
# default parameters for AsyncHTTPTransport
# Default parameters for HTTPTransport
# see https://github.com/encode/httpx/blob/e05a5372eb6172287458b37447c30f650047e1b8/httpx/_transports/default.py#L108-L121 # pylint: disable=line-too-long
default_params = {
'enable_http': False,
default_network_settings = {
'verify': settings_outgoing['verify'],
'enable_http': settings_outgoing['enable_http'],
'enable_http2': settings_outgoing['enable_http2'],
'max_connections': settings_outgoing['pool_connections'],
'max_keepalive_connections': settings_outgoing['pool_maxsize'],
'max_connections': settings_outgoing['pool_connections'], # different because of historical reason
'max_keepalive_connections': settings_outgoing['pool_maxsize'], # different because of historical reason
'keepalive_expiry': settings_outgoing['keepalive_expiry'],
'local_addresses': settings_outgoing['source_ips'],
'using_tor_proxy': settings_outgoing['using_tor_proxy'],
'proxies': settings_outgoing['proxies'],
'max_redirects': settings_outgoing['max_redirects'],
'retries': settings_outgoing['retries'],
'proxies': settings_outgoing['proxies'],
'local_addresses': settings_outgoing['source_ips'], # different because of historical reason
'using_tor_proxy': settings_outgoing['using_tor_proxy'],
'retry_on_http_error': None,
}
def new_network(params, logger_name=None):
nonlocal default_params
def new_network(network_settings: Dict[str, Any], logger_name: Optional[str] = None):
nonlocal default_network_settings
result = {}
result.update(default_params)
result.update(params)
result.update(default_network_settings)
result.update(network_settings)
if logger_name:
result['logger_name'] = logger_name
return Network(**result)
return Network.from_dict(**result)
def iter_networks():
nonlocal settings_engines
# ipv4 and ipv6 are always defined
self.networks = {
NetworkManager.DEFAULT_NAME: new_network({}, logger_name='default'),
'ipv4': new_network({'local_addresses': '0.0.0.0'}, logger_name='ipv4'),
'ipv6': new_network({'local_addresses': '::'}, logger_name='ipv6'),
}
# define networks from outgoing.networks. Example of configuration:
#
# outgoing:
# networks:
# my_proxy:
# proxies: http://localhost:1337
#
for network_name, network_dict in settings_outgoing['networks'].items():
self.networks[network_name] = new_network(network_dict, logger_name=network_name)
# Get the engine network settings directly from the engine modules and settings.yml (not as NetworkSettings)
engine_network_dict_settings = {}
for engine_spec in settings_engines:
engine_name = engine_spec['name']
engine = engines.get(engine_name)
if engine is None:
continue
network = getattr(engine, 'network', None)
yield engine_name, engine, network
engine_network_dict_settings[engine_name] = self._get_engine_network_settings(
engine_name, engine, default_network_settings
)
if NETWORKS:
done()
NETWORKS.clear()
NETWORKS[DEFAULT_NAME] = new_network({}, logger_name='default')
NETWORKS['ipv4'] = new_network({'local_addresses': '0.0.0.0'}, logger_name='ipv4')
NETWORKS['ipv6'] = new_network({'local_addresses': '::'}, logger_name='ipv6')
# Define networks from engines.[i].network (except references)
for engine_name, network_dict in engine_network_dict_settings.items():
if isinstance(network_dict, dict):
self.networks[engine_name] = new_network(network_dict, logger_name=engine_name)
# define networks from outgoing.networks
for network_name, network in settings_outgoing['networks'].items():
NETWORKS[network_name] = new_network(network, logger_name=network_name)
# Define networks from engines.[i].network (only references)
for engine_name, network_dict in engine_network_dict_settings.items():
if isinstance(network_dict, str):
self.networks[engine_name] = self.networks[network_dict]
# define networks from engines.[i].network (except references)
for engine_name, engine, network in iter_networks():
if network is None:
network = {}
for attribute_name, attribute_value in default_params.items():
if hasattr(engine, attribute_name):
network[attribute_name] = getattr(engine, attribute_name)
else:
network[attribute_name] = attribute_value
NETWORKS[engine_name] = new_network(network, logger_name=engine_name)
elif isinstance(network, dict):
NETWORKS[engine_name] = new_network(network, logger_name=engine_name)
# define networks from engines.[i].network (references)
for engine_name, engine, network in iter_networks():
if isinstance(network, str):
NETWORKS[engine_name] = NETWORKS[network]
# the /image_proxy endpoint has a dedicated network.
# same parameters than the default network, but HTTP/2 is disabled.
# It decreases the CPU load average, and the total time is more or less the same
if 'image_proxy' not in NETWORKS:
image_proxy_params = default_params.copy()
# The /image_proxy endpoint has a dedicated network using the same parameters
# as the default network, but HTTP/2 is disabled. It decreases the CPU load average,
# and the total time is more or less the same.
if 'image_proxy' not in self.networks:
image_proxy_params = default_network_settings.copy()
image_proxy_params['enable_http2'] = False
NETWORKS['image_proxy'] = new_network(image_proxy_params, logger_name='image_proxy')
self.networks['image_proxy'] = new_network(image_proxy_params, logger_name='image_proxy')
# Define a network the autocompletion
if 'autocomplete' not in self.networks:
self.networks['autocomplete'] = new_network(default_network_settings, logger_name='autocomplete')
# Check if each network is valid:
# * one HTTP client is instantiated
# --> Tor connectivity is checked if using_tor_proxy is True
if check:
exception_count = 0
for network in self.networks.values():
if not network.check_configuration():
exception_count += 1
if exception_count > 0:
raise RuntimeError("Invalid network configuration")
@staticmethod
def _get_engine_network_settings(engine_name, engine, default_network_settings):
if hasattr(engine, 'network'):
# The network configuration is defined in settings.yml inside a network key.
# For example:
#
# - name: arxiv
# engine: arxiv
# shortcut: arx
# network:
# http2: false
# proxies: socks5h://localhost:1337
#
network = getattr(engine, 'network', None)
if not isinstance(network, (dict, str)):
raise ValueError(f'Engine {engine_name}: network must be a dictionnary or string')
return network
# The network settings are mixed with the other engine settings.
# The code checks if the keys from default_network_settings are defined in the engine module
#
# For example:
#
# - name: arxiv
# engine: arxiv
# shortcut: arx
# http2: false
# proxies: socks5h://localhost:1337
#
return {
attribute_name: getattr(engine, attribute_name)
for attribute_name in default_network_settings.keys()
if hasattr(engine, attribute_name)
}
@atexit.register
def done():
"""Close all HTTP client
Avoid a warning at exit
See https://github.com/encode/httpx/pull/2026
Note: since Network.aclose has to be async, it is not possible to call this method on Network.__del__
So Network.aclose is called here using atexit.register
"""
try:
loop = get_loop()
if loop:
future = asyncio.run_coroutine_threadsafe(Network.aclose_all(), loop)
# wait 3 seconds to close the HTTP clients
future.result(3)
finally:
NETWORKS.clear()
NETWORKS[DEFAULT_NAME] = Network()
NETWORKS = NetworkManager()

View File

@ -4,12 +4,12 @@
"""
from searx import get_setting
from searx.exceptions import (
SearxEngineAccessDeniedException,
SearxEngineCaptchaException,
SearxEngineTooManyRequestsException,
SearxEngineAccessDeniedException,
)
from searx import get_setting
def is_cloudflare_challenge(resp):

View File

@ -19,7 +19,7 @@ from searx import logger
from searx.plugins import plugins
from searx.search.models import EngineRef, SearchQuery
from searx.engines import load_engines
from searx.network import initialize as initialize_network, check_network_configuration
from searx.network import NETWORKS
from searx.metrics import initialize as initialize_metrics, counter_inc, histogram_observe_time
from searx.search.processors import PROCESSORS, initialize as initialize_processors
from searx.search.checker import initialize as initialize_checker
@ -31,9 +31,7 @@ logger = logger.getChild('search')
def initialize(settings_engines=None, enable_checker=False, check_network=False, enable_metrics=True):
settings_engines = settings_engines or settings['engines']
load_engines(settings_engines)
initialize_network(settings_engines, settings['outgoing'])
if check_network:
check_network_configuration()
NETWORKS.initialize_from_settings(settings_engines, settings['outgoing'], check=check_network)
initialize_metrics([engine['name'] for engine in settings_engines], enable_metrics)
initialize_processors(settings_engines)
if enable_checker:

View File

@ -12,7 +12,8 @@ from urllib.parse import urlparse
import re
import httpx
from searx import network, logger
from searx import logger
from searx.network import NETWORKS
from searx.utils import gen_useragent, detect_language
from searx.results import ResultContainer
from searx.search.models import SearchQuery, EngineRef
@ -72,8 +73,8 @@ def _download_and_check_if_image(image_url: str) -> bool:
a = time()
try:
# use "image_proxy" (avoid HTTP/2)
network.set_context_network_name('image_proxy')
r, stream = network.stream(
network_context = NETWORKS.get('image_proxy').get_context()
r = network_context.stream(
'GET',
image_url,
timeout=10.0,
@ -96,7 +97,6 @@ def _download_and_check_if_image(image_url: str) -> bool:
else:
is_image = False
del r
del stream
return is_image
except httpx.TimeoutException:
logger.error('Timeout for %s: %i', image_url, int(time() - a))

View File

@ -12,7 +12,7 @@ from typing import Dict, Union
from searx import settings, logger
from searx.engines import engines
from searx.network import get_time_for_thread, get_network
from searx.network import NETWORKS
from searx.metrics import histogram_observe, counter_inc, count_exception, count_error
from searx.exceptions import SearxEngineAccessDeniedException, SearxEngineResponseException
from searx.utils import get_engine_from_settings
@ -66,7 +66,7 @@ class EngineProcessor(ABC):
self.engine = engine
self.engine_name = engine_name
self.logger = engines[engine_name].logger
key = get_network(self.engine_name)
key = NETWORKS.get(self.engine_name)
key = id(key) if key else self.engine_name
self.suspended_status = SUSPENDED_STATUS.setdefault(key, SuspendedStatus())
@ -107,26 +107,25 @@ class EngineProcessor(ABC):
suspended_time = exception_or_message.suspended_time
self.suspended_status.suspend(suspended_time, error_message) # pylint: disable=no-member
def _extend_container_basic(self, result_container, start_time, search_results):
def _extend_container_basic(self, result_container, start_time, search_results, network_time=None):
# update result_container
result_container.extend(self.engine_name, search_results)
engine_time = default_timer() - start_time
page_load_time = get_time_for_thread()
result_container.add_timing(self.engine_name, engine_time, page_load_time)
result_container.add_timing(self.engine_name, engine_time, network_time)
# metrics
counter_inc('engine', self.engine_name, 'search', 'count', 'successful')
histogram_observe(engine_time, 'engine', self.engine_name, 'time', 'total')
if page_load_time is not None:
histogram_observe(page_load_time, 'engine', self.engine_name, 'time', 'http')
if network_time is not None:
histogram_observe(network_time, 'engine', self.engine_name, 'time', 'http')
def extend_container(self, result_container, start_time, search_results):
def extend_container(self, result_container, start_time, search_results, network_time=None):
if getattr(threading.current_thread(), '_timeout', False):
# the main thread is not waiting anymore
self.handle_exception(result_container, 'timeout', None)
else:
# check if the engine accepted the request
if search_results is not None:
self._extend_container_basic(result_container, start_time, search_results)
self._extend_container_basic(result_container, start_time, search_results, network_time)
self.suspended_status.resume()
def extend_container_if_suspended(self, result_container):

View File

@ -9,6 +9,8 @@
from timeit import default_timer
import asyncio
import ssl
from typing import Dict, List
import httpx
import searx.network
@ -42,13 +44,8 @@ class OnlineProcessor(EngineProcessor):
engine_type = 'online'
def initialize(self):
# set timeout for all HTTP requests
searx.network.set_timeout_for_thread(self.engine.timeout, start_time=default_timer())
# reset the HTTP total time
searx.network.reset_time_for_thread()
# set the network
searx.network.set_context_network_name(self.engine_name)
super().initialize()
with searx.network.networkcontext_for_thread(self.engine_name, self.engine.timeout) as network_context:
network_context.call(super().initialize)
def get_params(self, search_query, engine_category):
"""Returns a set of :ref:`request params <engine request online>` or ``None``
@ -112,6 +109,7 @@ class OnlineProcessor(EngineProcessor):
else:
req = searx.network.post
if params['data']:
request_args['data'] = params['data']
# send the request
@ -133,7 +131,7 @@ class OnlineProcessor(EngineProcessor):
return response
def _search_basic(self, query, params):
def _search_basic(self, query, params) -> List[Dict]:
# update request parameters dependent on
# search-engine (contained in engines folder)
self.engine.request(query, params)
@ -153,21 +151,20 @@ class OnlineProcessor(EngineProcessor):
return self.engine.response(response)
def search(self, query, params, result_container, start_time, timeout_limit):
# set timeout for all HTTP requests
searx.network.set_timeout_for_thread(timeout_limit, start_time=start_time)
# reset the HTTP total time
searx.network.reset_time_for_thread()
# set the network
searx.network.set_context_network_name(self.engine_name)
try:
with searx.network.networkcontext_for_thread(
self.engine_name, timeout_limit, start_time
) as network_context:
# send requests and parse the results
search_results = self._search_basic(query, params)
self.extend_container(result_container, start_time, search_results)
search_results = network_context.call(self._search_basic, query, params)
# extend_container in the network context to get the HTTP runtime
self.extend_container(
result_container, start_time, search_results, network_time=network_context.get_http_runtime()
)
except ssl.SSLError as e:
# requests timeout (connect or read)
self.handle_exception(result_container, e, suspend=True)
self.logger.error("SSLError {}, verify={}".format(e, searx.network.get_network(self.engine_name).verify))
self.logger.error("SSLError {}, verify={}".format(e, searx.network.NETWORKS.get(self.engine_name).verify))
except (httpx.TimeoutException, asyncio.TimeoutError) as e:
# requests timeout (connect or read)
self.handle_exception(result_container, e, suspend=True)

View File

@ -209,9 +209,11 @@ SCHEMA = {
'outgoing': {
'useragent_suffix': SettingsValue(str, ''),
'request_timeout': SettingsValue(numbers.Real, 3.0),
'enable_http2': SettingsValue(bool, True),
'verify': SettingsValue((bool, str), True),
'max_request_timeout': SettingsValue((None, numbers.Real), None),
# defaut network
'verify': SettingsValue((bool, str), True),
'enable_http': SettingsValue(bool, False),
'enable_http2': SettingsValue(bool, True),
'pool_connections': SettingsValue(int, 100),
'pool_maxsize': SettingsValue(int, 10),
'keepalive_expiry': SettingsValue(numbers.Real, 5.0),

View File

@ -128,7 +128,7 @@ from searx.autocomplete import search_autocomplete, backends as autocomplete_bac
from searx.redisdb import initialize as redis_initialize
from searx.sxng_locales import sxng_locales
from searx.search import SearchWithPlugins, initialize as search_initialize
from searx.network import stream as http_stream, set_context_network_name
from searx.network import NETWORKS
from searx.search.checker import get_result as checker_get_result
logger = logger.getChild('webapp')
@ -1050,8 +1050,8 @@ def image_proxy():
'Sec-GPC': '1',
'DNT': '1',
}
set_context_network_name('image_proxy')
resp, stream = http_stream(method='GET', url=url, headers=request_headers, allow_redirects=True)
network_context = NETWORKS.get('image_proxy').get_context()
resp = network_context.stream(method='GET', url=url, headers=request_headers, allow_redirects=True, timeout=30)
content_length = resp.headers.get('Content-Length')
if content_length and content_length.isdigit() and int(content_length) > maximum_size:
return 'Max size', 400
@ -1082,18 +1082,19 @@ def image_proxy():
logger.exception('HTTP error on closing')
def close_stream():
nonlocal resp, stream
nonlocal resp
try:
if resp:
resp.close()
del resp
del stream
except httpx.HTTPError as e:
logger.debug('Exception while closing response', e)
try:
headers = dict_subset(resp.headers, {'Content-Type', 'Content-Encoding', 'Content-Length', 'Length'})
response = Response(stream, mimetype=resp.headers['Content-Type'], headers=headers, direct_passthrough=True)
response = Response(
resp.iter_bytes(), mimetype=resp.headers['Content-Type'], headers=headers, direct_passthrough=True
)
response.call_on_close(close_stream)
return response
except httpx.HTTPError:

View File

@ -13,14 +13,15 @@ Output file: :origin:`searx/data/ahmia_blacklist.txt` (:origin:`CI Update data
from os.path import join
import requests
from searx import searx_dir
from searx.network import provide_networkcontext, get
URL = 'https://ahmia.fi/blacklist/banned/'
@provide_networkcontext()
def fetch_ahmia_blacklist():
resp = requests.get(URL, timeout=3.0)
resp = get(URL, timeout=3.0)
if resp.status_code != 200:
# pylint: disable=broad-exception-raised
raise Exception("Error fetching Ahmia blacklist, HTTP code " + resp.status_code)

View File

@ -21,6 +21,7 @@ from os.path import join
from searx import searx_dir
from searx.locales import LOCALE_NAMES, locales_initialize
from searx.engines import wikidata, set_loggers
from searx.network import provide_networkcontext
set_loggers(wikidata, 'wikidata')
locales_initialize()
@ -138,8 +139,8 @@ def get_filename():
return join(join(searx_dir, "data"), "currencies.json")
@provide_networkcontext()
def main():
db = fetch_db()
# static

View File

@ -206,7 +206,6 @@ def initialize():
def fetch_wikidata_descriptions():
print('Fetching wikidata descriptions')
searx.network.set_timeout_for_thread(60)
result = wikidata.send_wikidata_query(
SPARQL_DESCRIPTION.replace('%IDS%', IDS).replace('%LANGUAGES_SPARQL%', LANGUAGES_SPARQL)
)
@ -355,6 +354,7 @@ def get_output():
return output
@searx.network.provide_networkcontext()
def main():
initialize()
fetch_wikidata_descriptions()

View File

@ -75,6 +75,7 @@ lang2emoji = {
}
@network.provide_networkcontext()
def main():
load_engines(settings['engines'])
# traits_map = EngineTraitsMap.from_data()
@ -85,7 +86,6 @@ def main():
def fetch_traits_map():
"""Fetchs supported languages for each engine and writes json file with those."""
network.set_timeout_for_thread(10.0)
def log(msg):
print(msg)

View File

@ -22,8 +22,7 @@ import json
import re
from os.path import join
import httpx
import searx.network
from searx import searx_dir # pylint: disable=E0401 C0413
from searx.external_bang import LEAF_KEY
@ -35,7 +34,7 @@ HTTP_COLON = 'http:'
def get_bang_url():
response = httpx.get(URL_BV1)
response = searx.network.get(URL_BV1)
response.raise_for_status()
r = RE_BANG_VERSION.findall(response.text)
@ -43,7 +42,7 @@ def get_bang_url():
def fetch_ddg_bangs(url):
response = httpx.get(url)
response = searx.network.get(url)
response.raise_for_status()
return json.loads(response.content.decode())
@ -155,9 +154,14 @@ def get_bangs_filename():
return join(join(searx_dir, "data"), "external_bangs.json")
if __name__ == '__main__':
@searx.network.provide_networkcontext()
def main():
bangs_url, bangs_version = get_bang_url()
print(f'fetch bangs from {bangs_url}')
output = {'version': bangs_version, 'trie': parse_ddg_bangs(fetch_ddg_bangs(bangs_url))}
with open(get_bangs_filename(), 'w', encoding="utf8") as fp:
json.dump(output, fp, ensure_ascii=False, indent=4)
with open(get_bangs_filename(), 'w', encoding="utf8") as f:
json.dump(output, f, ensure_ascii=False, indent=4)
if __name__ == '__main__':
main()

View File

@ -15,9 +15,9 @@ from os.path import join
from urllib.parse import urlparse, urljoin
from packaging.version import parse
import requests
from lxml import html
from searx import searx_dir
from searx import network
URL = 'https://ftp.mozilla.org/pub/firefox/releases/'
RELEASE_PATH = '/pub/firefox/releases/'
@ -37,8 +37,9 @@ useragents = {
}
@network.provide_networkcontext()
def fetch_firefox_versions():
resp = requests.get(URL, timeout=2.0)
resp = network.get(URL, timeout=2.0)
if resp.status_code != 200:
# pylint: disable=broad-exception-raised
raise Exception("Error fetching firefox versions, HTTP code " + resp.status_code)

View File

@ -48,8 +48,8 @@ import collections
from pathlib import Path
from searx import searx_dir
from searx.network import set_timeout_for_thread
from searx.engines import wikidata, set_loggers
from searx.network import provide_networkcontext
from searx.sxng_locales import sxng_locales
from searx.engines.openstreetmap import get_key_rank, VALUE_TO_LINK
@ -207,12 +207,15 @@ def get_osm_tags_filename():
return Path(searx_dir) / "data" / "osm_keys_tags.json"
if __name__ == '__main__':
set_timeout_for_thread(60)
@provide_networkcontext()
def main():
result = {
'keys': optimize_keys(get_keys()),
'tags': optimize_tags(get_tags()),
}
with open(get_osm_tags_filename(), 'w', encoding="utf8") as f:
json.dump(result, f, indent=4, ensure_ascii=False, sort_keys=True)
if __name__ == '__main__':
main()

View File

@ -18,6 +18,7 @@ from os.path import join
from searx import searx_dir
from searx.engines import wikidata, set_loggers
from searx.network import provide_networkcontext
set_loggers(wikidata, 'wikidata')
@ -45,6 +46,7 @@ ORDER BY ?item DESC(?rank) ?symbol
"""
@provide_networkcontext()
def get_data():
results = collections.OrderedDict()
response = wikidata.send_wikidata_query(SARQL_REQUEST)

View File

@ -1,6 +1,6 @@
import os
import aiounittest
import unittest
os.environ.pop('SEARX_DEBUG', None)
os.environ.pop('SEARX_DEBUG_LOG_LEVEL', None)
@ -36,7 +36,7 @@ class SearxTestLayer:
pass
class SearxTestCase(aiounittest.AsyncTestCase):
class SearxTestCase(unittest.TestCase):
"""Base test case for non-robot tests."""
layer = SearxTestLayer

View File

@ -0,0 +1,97 @@
server:
secret_key: "user_settings_secret"
outgoing:
networks:
http00:
enable_http: false
enable_http2: false
http10:
enable_http: true
enable_http2: false
http01:
enable_http: false
enable_http2: true
http11:
enable_http: true
enable_http2: true
sock5h:
proxies: socks5h://127.0.0.1:4000
sock5:
proxies: socks5://127.0.0.1:4000
sock4:
proxies: socks4://127.0.0.1:4000
engines:
#
- name: enginea
shortcut: a
engine: dummy
network:
proxies: http://127.0.0.1:8000
- name: engineb
shortcut: b
engine: dummy
proxies: http://127.0.0.1:8000
- name: enginec
shortcut: c
engine: dummy
network:
proxies:
all://: http://127.0.0.1:8000
- name: engined
shortcut: d
engine: dummy
network:
proxies:
all://:
- http://127.0.0.1:8000
- http://127.0.0.1:9000
- name: enginee
shortcut: e
engine: dummy
network:
proxies:
all://:
- http://127.0.0.1:8000
- http://127.0.0.1:9000
example.com://:
- http://127.0.0.1:6000
- http://127.0.0.1:7000
- name: enginef
shortcut: f
engine: dummy
network:
proxies:
- http://127.0.0.1:8000
- http://127.0.0.1:9000
- name: engineg
shortcut: g
engine: dummy
network:
local_addresses:
- 192.168.0.1
- 192.168.0.200
- name: engineh
shortcut: h
engine: dummy
network:
local_addresses: 192.168.0.2
- name: ip2
shortcut: ip2
engine: dummy
network:
local_addresses: 192.168.0.1/30
- name: enginei
shortcut: i
engine: dummy
network:
retry_strategy: engine
- name: enginej
shortcut: j
engine: dummy
network:
retry_strategy: SAME_HTTP_CLIENT
- name: enginek
shortcut: k
engine: dummy
network:
retry_strategy: DIFFERENT_HTTP_CLIENT

View File

@ -1,56 +1,81 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
from typing import Optional
from mock import patch
from parameterized import parameterized, parameterized_class
import time
import httpx
from searx.network.network import Network, NETWORKS, initialize
import searx.network
import searx.network.context
from searx import settings
from searx.network.client import BaseHTTPClient, HTTPClient, TorHTTPClient, _HTTPMultiClientConf
from searx.network.network import Network, NETWORKS
from tests import SearxTestCase
class TestHTTPClient(SearxTestCase):
def test_get_client(self):
httpclient = BaseHTTPClient(verify=True)
client1 = httpclient._get_client_and_update_kwargs()
client2 = httpclient._get_client_and_update_kwargs(verify=True)
client3 = httpclient._get_client_and_update_kwargs(max_redirects=10)
client4 = httpclient._get_client_and_update_kwargs(verify=True)
client5 = httpclient._get_client_and_update_kwargs(verify=False)
client6 = httpclient._get_client_and_update_kwargs(max_redirects=10)
self.assertEqual(client1, client2)
self.assertEqual(client1, client4)
self.assertNotEqual(client1, client3)
self.assertNotEqual(client1, client5)
self.assertEqual(client3, client6)
httpclient.close()
class TestNetwork(SearxTestCase):
def setUp(self):
initialize()
NETWORKS.initialize_from_settings(settings_engines=settings["engines"], settings_outgoing=settings["outgoing"])
def test_simple(self):
network = Network()
network = Network.from_dict()
self.assertEqual(next(network._local_addresses_cycle), None)
self.assertEqual(next(network._proxies_cycle), ())
def test_ipaddress_cycle(self):
network = NETWORKS['ipv6']
network = NETWORKS.get('ipv6')
self.assertEqual(next(network._local_addresses_cycle), '::')
self.assertEqual(next(network._local_addresses_cycle), '::')
network = NETWORKS['ipv4']
network = NETWORKS.get('ipv4')
self.assertEqual(next(network._local_addresses_cycle), '0.0.0.0')
self.assertEqual(next(network._local_addresses_cycle), '0.0.0.0')
network = Network(local_addresses=['192.168.0.1', '192.168.0.2'])
network = Network.from_dict(local_addresses=['192.168.0.1', '192.168.0.2'])
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1')
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.2')
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1')
network = Network(local_addresses=['192.168.0.0/30'])
network = Network.from_dict(local_addresses=['192.168.0.0/30'])
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1')
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.2')
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1')
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.2')
network = Network(local_addresses=['fe80::/10'])
network = Network.from_dict(local_addresses=['fe80::/10'])
self.assertEqual(next(network._local_addresses_cycle), 'fe80::1')
self.assertEqual(next(network._local_addresses_cycle), 'fe80::2')
self.assertEqual(next(network._local_addresses_cycle), 'fe80::3')
with self.assertRaises(ValueError):
Network(local_addresses=['not_an_ip_address'])
Network.from_dict(local_addresses=['not_an_ip_address'])
def test_proxy_cycles(self):
network = Network(proxies='http://localhost:1337')
network = Network.from_dict(proxies='http://localhost:1337')
self.assertEqual(next(network._proxies_cycle), (('all://', 'http://localhost:1337'),))
network = Network(proxies={'https': 'http://localhost:1337', 'http': 'http://localhost:1338'})
network = Network.from_dict(proxies={'https': 'http://localhost:1337', 'http': 'http://localhost:1338'})
self.assertEqual(
next(network._proxies_cycle), (('https://', 'http://localhost:1337'), ('http://', 'http://localhost:1338'))
)
@ -58,7 +83,7 @@ class TestNetwork(SearxTestCase):
next(network._proxies_cycle), (('https://', 'http://localhost:1337'), ('http://', 'http://localhost:1338'))
)
network = Network(
network = Network.from_dict(
proxies={'https': ['http://localhost:1337', 'http://localhost:1339'], 'http': 'http://localhost:1338'}
)
self.assertEqual(
@ -69,7 +94,7 @@ class TestNetwork(SearxTestCase):
)
with self.assertRaises(ValueError):
Network(proxies=1)
Network.from_dict(proxies=1)
def test_get_kwargs_clients(self):
kwargs = {
@ -78,120 +103,127 @@ class TestNetwork(SearxTestCase):
'timeout': 2,
'allow_redirects': True,
}
kwargs_client = Network.extract_kwargs_clients(kwargs)
kwargs_client, kwargs = BaseHTTPClient()._extract_kwargs_clients(kwargs)
self.assertEqual(len(kwargs_client), 2)
self.assertEqual(len(kwargs), 2)
self.assertEqual(kwargs['timeout'], 2)
self.assertEqual(kwargs['follow_redirects'], True)
self.assertEqual(kwargs['allow_redirects'], True)
self.assertTrue(kwargs_client['verify'])
self.assertEqual(kwargs_client['max_redirects'], 5)
self.assertIsInstance(kwargs_client, _HTTPMultiClientConf)
self.assertTrue(kwargs_client.verify)
self.assertEqual(kwargs_client.max_redirects, 5)
async def test_get_client(self):
network = Network(verify=True)
client1 = await network.get_client()
client2 = await network.get_client(verify=True)
client3 = await network.get_client(max_redirects=10)
client4 = await network.get_client(verify=True)
client5 = await network.get_client(verify=False)
client6 = await network.get_client(max_redirects=10)
def test_close(self):
network = Network.from_dict(verify=True)
network._get_http_client()
network.close()
self.assertEqual(client1, client2)
self.assertEqual(client1, client4)
self.assertNotEqual(client1, client3)
self.assertNotEqual(client1, client5)
self.assertEqual(client3, client6)
await network.aclose()
async def test_aclose(self):
network = Network(verify=True)
await network.get_client()
await network.aclose()
async def test_request(self):
def test_request(self):
a_text = 'Lorem Ipsum'
response = httpx.Response(status_code=200, text=a_text)
with patch.object(httpx.AsyncClient, 'request', return_value=response):
network = Network(enable_http=True)
response = await network.request('GET', 'https://example.com/')
with patch.object(httpx.Client, 'request', return_value=response):
network = Network.from_dict(enable_http=True)
http_client = network._get_http_client()
response = http_client.request('GET', 'https://example.com/')
self.assertEqual(response.text, a_text)
await network.aclose()
network.close()
@parameterized_class(
[
{"RETRY_STRATEGY": "ENGINE"},
{"RETRY_STRATEGY": "SAME_HTTP_CLIENT"},
{"RETRY_STRATEGY": "DIFFERENT_HTTP_CLIENT"},
]
)
class TestNetworkRequestRetries(SearxTestCase):
TEXT = 'Lorem Ipsum'
TEXT = "Lorem Ipsum"
RETRY_STRATEGY = "Engine"
@classmethod
def get_response_404_then_200(cls):
def get_response_403_then_200(cls):
first = True
async def get_response(*args, **kwargs):
def get_response(*args, **kwargs):
nonlocal first
request = httpx.Request('GET', 'http://localhost')
if first:
first = False
return httpx.Response(status_code=403, text=TestNetworkRequestRetries.TEXT)
return httpx.Response(status_code=200, text=TestNetworkRequestRetries.TEXT)
return httpx.Response(status_code=403, text=TestNetworkRequestRetries.TEXT, request=request)
return httpx.Response(status_code=200, text=TestNetworkRequestRetries.TEXT, request=request)
return get_response
async def test_retries_ok(self):
with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()):
network = Network(enable_http=True, retries=1, retry_on_http_error=403)
response = await network.request('GET', 'https://example.com/', raise_for_httperror=False)
def test_retries_ok(self):
with patch.object(httpx.Client, 'request', new=TestNetworkRequestRetries.get_response_403_then_200()):
network = Network.from_dict(
enable_http=True, retries=1, retry_on_http_error=403, retry_strategy=self.RETRY_STRATEGY
)
context = network.get_context(timeout=3600.0)
response = context.request('GET', 'https://example.com/', raise_for_httperror=False)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.text, TestNetworkRequestRetries.TEXT)
await network.aclose()
network.close()
async def test_retries_fail_int(self):
with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()):
network = Network(enable_http=True, retries=0, retry_on_http_error=403)
response = await network.request('GET', 'https://example.com/', raise_for_httperror=False)
def test_retries_fail_int(self):
with patch.object(httpx.Client, 'request', new=TestNetworkRequestRetries.get_response_403_then_200()):
network = Network.from_dict(
enable_http=True, retries=0, retry_on_http_error=403, retry_strategy=self.RETRY_STRATEGY
)
context = network.get_context(timeout=2.0)
response = context.request('GET', 'https://example.com/', raise_for_httperror=False)
self.assertEqual(response.status_code, 403)
await network.aclose()
network.close()
async def test_retries_fail_list(self):
with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()):
network = Network(enable_http=True, retries=0, retry_on_http_error=[403, 429])
response = await network.request('GET', 'https://example.com/', raise_for_httperror=False)
def test_retries_fail_list(self):
with patch.object(httpx.Client, 'request', new=TestNetworkRequestRetries.get_response_403_then_200()):
network = Network.from_dict(
enable_http=True, retries=0, retry_on_http_error=[403, 429], retry_strategy=self.RETRY_STRATEGY
)
context = network.get_context(timeout=2.0)
response = context.request('GET', 'https://example.com/', raise_for_httperror=False)
self.assertEqual(response.status_code, 403)
await network.aclose()
network.close()
async def test_retries_fail_bool(self):
with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()):
network = Network(enable_http=True, retries=0, retry_on_http_error=True)
response = await network.request('GET', 'https://example.com/', raise_for_httperror=False)
def test_retries_fail_bool(self):
with patch.object(httpx.Client, 'request', new=TestNetworkRequestRetries.get_response_403_then_200()):
network = Network.from_dict(
enable_http=True, retries=0, retry_on_http_error=True, retry_strategy=self.RETRY_STRATEGY
)
context = network.get_context(timeout=2.0)
response = context.request('GET', 'https://example.com/', raise_for_httperror=False)
self.assertEqual(response.status_code, 403)
await network.aclose()
network.close()
async def test_retries_exception_then_200(self):
def test_retries_exception_then_200(self):
request_count = 0
async def get_response(*args, **kwargs):
def get_response(*args, **kwargs):
nonlocal request_count
request_count += 1
if request_count < 3:
raise httpx.RequestError('fake exception', request=None)
return httpx.Response(status_code=200, text=TestNetworkRequestRetries.TEXT)
with patch.object(httpx.AsyncClient, 'request', new=get_response):
network = Network(enable_http=True, retries=2)
response = await network.request('GET', 'https://example.com/', raise_for_httperror=False)
with patch.object(httpx.Client, 'request', new=get_response):
network = Network.from_dict(enable_http=True, retries=2, retry_strategy=self.RETRY_STRATEGY)
context = network.get_context(timeout=2.0)
response = context.request('GET', 'https://example.com/', raise_for_httperror=False)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.text, TestNetworkRequestRetries.TEXT)
await network.aclose()
network.close()
async def test_retries_exception(self):
async def get_response(*args, **kwargs):
def test_retries_exception(self):
def get_response(*args, **kwargs):
raise httpx.RequestError('fake exception', request=None)
with patch.object(httpx.AsyncClient, 'request', new=get_response):
network = Network(enable_http=True, retries=0)
with patch.object(httpx.Client, 'request', new=get_response):
network = Network.from_dict(enable_http=True, retries=0, retry_strategy=self.RETRY_STRATEGY)
context = network.get_context(timeout=2.0)
with self.assertRaises(httpx.RequestError):
await network.request('GET', 'https://example.com/', raise_for_httperror=False)
await network.aclose()
context.request('GET', 'https://example.com/', raise_for_httperror=False)
network.close()
class TestNetworkStreamRetries(SearxTestCase):
@ -200,43 +232,246 @@ class TestNetworkStreamRetries(SearxTestCase):
@classmethod
def get_response_exception_then_200(cls):
from httpx import SyncByteStream
first = True
def stream(*args, **kwargs):
class fake_stream(SyncByteStream):
def __iter__(self):
yield TestNetworkStreamRetries.TEXT.encode()
def send(*args, **kwargs):
nonlocal first
if first:
first = False
raise httpx.RequestError('fake exception', request=None)
return httpx.Response(status_code=200, text=TestNetworkStreamRetries.TEXT)
return httpx.Response(status_code=200, stream=fake_stream())
return stream
return send
async def test_retries_ok(self):
with patch.object(httpx.AsyncClient, 'stream', new=TestNetworkStreamRetries.get_response_exception_then_200()):
network = Network(enable_http=True, retries=1, retry_on_http_error=403)
response = await network.stream('GET', 'https://example.com/')
self.assertEqual(response.text, TestNetworkStreamRetries.TEXT)
await network.aclose()
def test_retries_ok(self):
with patch.object(httpx.Client, 'send', new=TestNetworkStreamRetries.get_response_exception_then_200()):
network = Network.from_dict(enable_http=True, retries=1, retry_on_http_error=403)
context = network.get_context(timeout=3600.0)
response = context.stream('GET', 'https://example.com/')
btext = b"".join(btext for btext in response.iter_bytes())
self.assertEqual(btext.decode(), TestNetworkStreamRetries.TEXT)
response.close()
network.close()
async def test_retries_fail(self):
with patch.object(httpx.AsyncClient, 'stream', new=TestNetworkStreamRetries.get_response_exception_then_200()):
network = Network(enable_http=True, retries=0, retry_on_http_error=403)
def test_retries_fail(self):
with patch.object(httpx.Client, 'send', new=TestNetworkStreamRetries.get_response_exception_then_200()):
network = Network.from_dict(enable_http=True, retries=0, retry_on_http_error=403)
context = network.get_context(timeout=2.0)
with self.assertRaises(httpx.RequestError):
await network.stream('GET', 'https://example.com/')
await network.aclose()
context.stream('GET', 'https://example.com/')
network.close()
async def test_retries_exception(self):
def test_retries_exception(self):
first = True
def stream(*args, **kwargs):
def send(*args, **kwargs):
nonlocal first
if first:
first = False
return httpx.Response(status_code=403, text=TestNetworkRequestRetries.TEXT)
return httpx.Response(status_code=200, text=TestNetworkRequestRetries.TEXT)
with patch.object(httpx.AsyncClient, 'stream', new=stream):
network = Network(enable_http=True, retries=0, retry_on_http_error=403)
response = await network.stream('GET', 'https://example.com/', raise_for_httperror=False)
with patch.object(httpx.Client, 'send', new=send):
network = Network.from_dict(enable_http=True, retries=0, retry_on_http_error=403)
context = network.get_context(timeout=2.0)
response = context.stream('GET', 'https://example.com/', raise_for_httperror=False)
self.assertEqual(response.status_code, 403)
await network.aclose()
network.close()
class TestNetworkApi(SearxTestCase):
TEXT = 'Lorem Ipsum'
def test_no_networkcontext(self):
with self.assertRaises(searx.network.NetworkContextNotFound):
searx.network.request("GET", "https://example.com")
def test_provide_networkcontext(self):
send_was_called = False
response = None
def send(*args, **kwargs):
nonlocal send_was_called
send_was_called = True
return httpx.Response(status_code=200, text=TestNetworkApi.TEXT)
@searx.network.provide_networkcontext()
def main():
nonlocal response
response = searx.network.get("https://example.com")
with patch.object(httpx.Client, 'send', new=send):
main()
self.assertTrue(send_was_called)
self.assertIsInstance(response, httpx.Response)
self.assertEqual(response.text, TestNetworkApi.TEXT)
@parameterized.expand(
[
("OPTIONS",),
("HEAD",),
("DELETE",),
]
)
def test_options(self, method):
send_was_called = False
request: Optional[httpx.Request] = None
response = None
def send(_, actual_request: httpx.Request, **kwargs):
nonlocal request, send_was_called
request = actual_request
send_was_called = True
return httpx.Response(status_code=200, text=TestNetworkApi.TEXT)
@searx.network.provide_networkcontext()
def main():
nonlocal response
f = getattr(searx.network, method.lower())
response = f("https://example.com", params={"a": "b"}, headers={"c": "d"})
with patch.object(httpx.Client, 'send', new=send):
main()
self.assertTrue(send_was_called)
self.assertIsInstance(response, httpx.Response)
self.assertEqual(request.method, method)
self.assertEqual(request.url, "https://example.com?a=b")
self.assertEqual(request.headers["c"], "d")
self.assertEqual(response.text, TestNetworkApi.TEXT)
@parameterized.expand(
[
("POST",),
("PUT",),
("PATCH",),
]
)
def test_post(self, method):
send_was_called = False
request: Optional[httpx.Request] = None
response = None
data = "this_is_data"
def send(_, actual_request: httpx.Request, **kwargs):
nonlocal request, send_was_called
request = actual_request
send_was_called = True
return httpx.Response(status_code=200, text=TestNetworkApi.TEXT)
@searx.network.provide_networkcontext()
def main():
nonlocal response
f = getattr(searx.network, method.lower())
response = f("https://example.com", params={"a": "b"}, headers={"c": "d"}, data=data)
with patch.object(httpx.Client, 'send', new=send):
main()
self.assertTrue(send_was_called)
self.assertIsInstance(response, httpx.Response)
self.assertEqual(request.method, method)
self.assertEqual(request.url, "https://example.com?a=b")
self.assertEqual(request.headers["c"], "d")
self.assertEqual(request.content, data.encode())
self.assertEqual(response.text, TestNetworkApi.TEXT)
def test_get_remaining_time(self):
def send(*args, **kwargs):
time.sleep(0.25)
return httpx.Response(status_code=200, text=TestNetworkApi.TEXT)
with patch.object(httpx.Client, 'send', new=send):
with searx.network.networkcontext_for_thread(timeout=3.0) as network_context:
network_context.request("GET", "https://example.com")
network_context.get_http_runtime()
self.assertTrue(network_context.get_http_runtime() > 0.25)
overhead = 0.2 # see NetworkContext.get_remaining_time
self.assertTrue(network_context.get_remaining_time() < (2.75 + overhead))
class TestNetworkRepr(SearxTestCase):
def test_repr(self):
network = Network.from_dict(logger_name="test", retry_strategy="ENGINE")
network_context = network.get_context(5.0)
network_context._set_http_client()
http_client = network_context._get_http_client()
r_network = repr(network)
r_network_context = repr(network_context)
r_http_client = repr(http_client)
self.assertEqual(r_network, "<Network logger_name='test'>")
self.assertTrue(r_network_context.startswith("<NetworkContextRetryFunction retries=0 timeout=5.0 "))
self.assertTrue(r_network_context.endswith("network_context=<Network logger_name='test'>>"))
self.assertTrue(r_http_client.startswith("<searx.network.context._RetryFunctionHTTPClient"))
def test_repr_no_network(self):
def http_client_factory():
return HTTPClient()
network_context = searx.network.context.NetworkContextRetryFunction(3, http_client_factory, 1.0, 2.0)
r_network_context = repr(network_context)
self.assertTrue(
r_network_context.startswith("<NetworkContextRetryFunction retries=3 timeout=2.0 http_client=None")
)
class TestTorHTTPClient(SearxTestCase):
API_RESPONSE_FALSE = '{"IsTor":false,"IP":"42.42.42.42"}'
API_RESPONSE_TRUE = '{"IsTor":true,"IP":"42.42.42.42"}'
@parameterized.expand(
[
({"all://": "socks5://localhost:4000"},),
({"all://": "socks5h://localhost:4000"},),
({"all://": "http://localhost:5000"},),
({"all://": "https://localhost:5000"},),
(None,),
]
)
def test_without_tor(self, proxies):
check_done = False
def send(*args, **kwargs):
nonlocal check_done
return httpx.Response(status_code=200, text=TestTorHTTPClient.API_RESPONSE_FALSE)
with patch.object(httpx.Client, 'send', new=send):
TorHTTPClient._clear_cache()
TorHTTPClient._TOR_CHECK_RESULT = {}
with self.assertRaises(httpx.HTTPError):
TorHTTPClient(proxies=proxies)
self.assertTrue(check_done)
@parameterized.expand(
[
("socks5h://localhost:8888",),
]
)
def test_with_tor(self, proxy_url):
check_count = 0
def send(*args, **kwargs):
nonlocal check_count
check_count += 1
return httpx.Response(status_code=200, text=TestTorHTTPClient.API_RESPONSE_TRUE)
with patch.object(httpx.Client, 'send', new=send):
proxies = {
"all://": proxy_url,
}
TorHTTPClient._clear_cache()
TorHTTPClient(proxies=proxies, enable_http=False)
TorHTTPClient(proxies=proxies, enable_http=False)
self.assertEqual(check_count, 1)

View File

@ -0,0 +1,31 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
from os.path import dirname, join, abspath
from unittest.mock import patch
from searx import settings_loader, settings_defaults, engines
from tests import SearxTestCase
from searx.network import network
test_dir = abspath(dirname(__file__))
class TestDefaultSettings(SearxTestCase):
def test_load(self):
# TODO : write more tests
# for now, make sure the code does not crash
with patch.dict(settings_loader.environ, {'SEARXNG_SETTINGS_PATH': join(test_dir, 'network_settings.yml')}):
settings, _ = settings_loader.load_settings()
settings_defaults.apply_schema(settings, settings_defaults.SCHEMA, [])
engines.load_engines(settings["engines"])
network_manager = network.NetworkManager()
network_manager.initialize_from_settings(settings["engines"], settings["outgoing"], check=True)
network_enginea = network_manager.get("enginea")
http_client = network_enginea._get_http_client()
repr_network = "<Network logger_name='enginea'>"
self.assertEqual(repr(network_enginea), repr_network)
self.assertTrue(repr_network in repr(http_client))