searx.network: refactoring

This commit is contained in:
Alexandre Flament 2023-08-18 12:16:02 +00:00 committed by Alexandre Flament
parent d4e21dec26
commit 45c217ff6e
24 changed files with 2320 additions and 888 deletions

View file

@ -10,7 +10,7 @@ from typing import Dict, Union
from searx import settings, logger
from searx.engines import engines
from searx.network import get_time_for_thread, get_network
from searx.network import NETWORKS
from searx.metrics import histogram_observe, counter_inc, count_exception, count_error
from searx.exceptions import SearxEngineAccessDeniedException, SearxEngineResponseException
from searx.utils import get_engine_from_settings
@ -64,7 +64,7 @@ class EngineProcessor(ABC):
self.engine = engine
self.engine_name = engine_name
self.logger = engines[engine_name].logger
key = get_network(self.engine_name)
key = NETWORKS.get(self.engine_name)
key = id(key) if key else self.engine_name
self.suspended_status = SUSPENDED_STATUS.setdefault(key, SuspendedStatus())
@ -105,26 +105,25 @@ class EngineProcessor(ABC):
suspended_time = exception_or_message.suspended_time
self.suspended_status.suspend(suspended_time, error_message) # pylint: disable=no-member
def _extend_container_basic(self, result_container, start_time, search_results):
def _extend_container_basic(self, result_container, start_time, search_results, network_time=None):
# update result_container
result_container.extend(self.engine_name, search_results)
engine_time = default_timer() - start_time
page_load_time = get_time_for_thread()
result_container.add_timing(self.engine_name, engine_time, page_load_time)
result_container.add_timing(self.engine_name, engine_time, network_time)
# metrics
counter_inc('engine', self.engine_name, 'search', 'count', 'successful')
histogram_observe(engine_time, 'engine', self.engine_name, 'time', 'total')
if page_load_time is not None:
histogram_observe(page_load_time, 'engine', self.engine_name, 'time', 'http')
if network_time is not None:
histogram_observe(network_time, 'engine', self.engine_name, 'time', 'http')
def extend_container(self, result_container, start_time, search_results):
def extend_container(self, result_container, start_time, search_results, network_time=None):
if getattr(threading.current_thread(), '_timeout', False):
# the main thread is not waiting anymore
self.handle_exception(result_container, 'timeout', None)
else:
# check if the engine accepted the request
if search_results is not None:
self._extend_container_basic(result_container, start_time, search_results)
self._extend_container_basic(result_container, start_time, search_results, network_time)
self.suspended_status.resume()
def extend_container_if_suspended(self, result_container):

View file

@ -7,6 +7,8 @@
from timeit import default_timer
import asyncio
import ssl
from typing import Dict, List
import httpx
import searx.network
@ -40,13 +42,8 @@ class OnlineProcessor(EngineProcessor):
engine_type = 'online'
def initialize(self):
# set timeout for all HTTP requests
searx.network.set_timeout_for_thread(self.engine.timeout, start_time=default_timer())
# reset the HTTP total time
searx.network.reset_time_for_thread()
# set the network
searx.network.set_context_network_name(self.engine_name)
super().initialize()
with searx.network.networkcontext_manager(self.engine_name, self.engine.timeout) as network_context:
network_context.call(super().initialize)
def get_params(self, search_query, engine_category):
"""Returns a set of :ref:`request params <engine request online>` or ``None``
@ -110,7 +107,8 @@ class OnlineProcessor(EngineProcessor):
else:
req = searx.network.post
request_args['data'] = params['data']
if params['data']:
request_args['data'] = params['data']
# send the request
response = req(params['url'], **request_args)
@ -131,7 +129,7 @@ class OnlineProcessor(EngineProcessor):
return response
def _search_basic(self, query, params):
def _search_basic(self, query, params) -> List[Dict]:
# update request parameters dependent on
# search-engine (contained in engines folder)
self.engine.request(query, params)
@ -151,21 +149,18 @@ class OnlineProcessor(EngineProcessor):
return self.engine.response(response)
def search(self, query, params, result_container, start_time, timeout_limit):
# set timeout for all HTTP requests
searx.network.set_timeout_for_thread(timeout_limit, start_time=start_time)
# reset the HTTP total time
searx.network.reset_time_for_thread()
# set the network
searx.network.set_context_network_name(self.engine_name)
try:
# send requests and parse the results
search_results = self._search_basic(query, params)
self.extend_container(result_container, start_time, search_results)
with searx.network.networkcontext_manager(self.engine_name, timeout_limit, start_time) as network_context:
# send requests and parse the results
search_results = network_context.call(self._search_basic, query, params)
# extend_container in the network context to get the HTTP runtime
self.extend_container(
result_container, start_time, search_results, network_time=network_context.get_http_runtime()
)
except ssl.SSLError as e:
# requests timeout (connect or read)
self.handle_exception(result_container, e, suspend=True)
self.logger.error("SSLError {}, verify={}".format(e, searx.network.get_network(self.engine_name).verify))
self.logger.error("SSLError {}, verify={}".format(e, searx.network.NETWORKS.get(self.engine_name).verify))
except (httpx.TimeoutException, asyncio.TimeoutError) as e:
# requests timeout (connect or read)
self.handle_exception(result_container, e, suspend=True)