[mod] refactoring: processors

Report to the user suspended engines.

searx.search.processor.abstract:
* manages suspend time (per network).
* reports suspended time to the ResultContainer (method extend_container_if_suspended)
* adds the results to the ResultContainer (method extend_container)
* handles exceptions (method handle_exception)
This commit is contained in:
Alexandre Flament 2021-04-13 15:21:53 +02:00
parent ae5954f2da
commit aae7830d14
9 changed files with 143 additions and 125 deletions

View File

@ -51,8 +51,6 @@ engine_default_args = {'paging': False,
'shortcut': '-', 'shortcut': '-',
'disabled': False, 'disabled': False,
'enable_http': False, 'enable_http': False,
'suspend_end_time': 0,
'continuous_errors': 0,
'time_range_support': False, 'time_range_support': False,
'engine_type': 'online', 'engine_type': 'online',
'display_error_messages': True, 'display_error_messages': True,

View File

@ -199,7 +199,7 @@ class Network:
def get_network(name=None): def get_network(name=None):
global NETWORKS global NETWORKS
return NETWORKS[name or DEFAULT_NAME] return NETWORKS.get(name or DEFAULT_NAME)
def initialize(settings_engines=None, settings_outgoing=None): def initialize(settings_engines=None, settings_outgoing=None):

View File

@ -369,9 +369,9 @@ class ResultContainer:
return 0 return 0
return resultnum_sum / len(self._number_of_results) return resultnum_sum / len(self._number_of_results)
def add_unresponsive_engine(self, engine_name, error_type, error_message=None): def add_unresponsive_engine(self, engine_name, error_type, error_message=None, suspended=False):
if engines[engine_name].display_error_messages: if engines[engine_name].display_error_messages:
self.unresponsive_engines.add((engine_name, error_type, error_message)) self.unresponsive_engines.add((engine_name, error_type, error_message, suspended))
def add_timing(self, engine_name, engine_time, page_load_time): def add_timing(self, engine_name, engine_time, page_load_time):
self.timings.append({ self.timings.append({

View File

@ -106,12 +106,16 @@ class Search:
for engineref in self.search_query.engineref_list: for engineref in self.search_query.engineref_list:
processor = processors[engineref.name] processor = processors[engineref.name]
# stop the request now if the engine is suspend
if processor.extend_container_if_suspended(self.result_container):
continue
# set default request parameters # set default request parameters
request_params = processor.get_params(self.search_query, engineref.category) request_params = processor.get_params(self.search_query, engineref.category)
if request_params is None: if request_params is None:
continue continue
with threading.RLock(): with processor.lock:
processor.engine.stats['sent_search_count'] += 1 processor.engine.stats['sent_search_count'] += 1
# append request to list # append request to list

View File

@ -4,7 +4,6 @@ import typing
import types import types
import functools import functools
import itertools import itertools
import threading
from time import time from time import time
from urllib.parse import urlparse from urllib.parse import urlparse
@ -385,7 +384,7 @@ class Checker:
engineref_category = search_query.engineref_list[0].category engineref_category = search_query.engineref_list[0].category
params = self.processor.get_params(search_query, engineref_category) params = self.processor.get_params(search_query, engineref_category)
if params is not None: if params is not None:
with threading.RLock(): with self.processor.lock:
self.processor.engine.stats['sent_search_count'] += 1 self.processor.engine.stats['sent_search_count'] += 1
self.processor.search(search_query.query, params, result_container, time(), 5) self.processor.search(search_query.query, params, result_container, time(), 5)
return result_container return result_container

View File

@ -1,17 +1,115 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
import threading
from abc import abstractmethod, ABC from abc import abstractmethod, ABC
from time import time
from searx import logger from searx import logger
from searx.engines import settings
from searx.network import get_time_for_thread, get_network
from searx.metrology.error_recorder import record_exception, record_error
from searx.exceptions import SearxEngineAccessDeniedException
logger = logger.getChild('searx.search.processor') logger = logger.getChild('searx.search.processor')
SUSPENDED_STATUS = {}
class SuspendedStatus:
__slots__ = 'suspend_end_time', 'suspend_reason', 'continuous_errors', 'lock'
def __init__(self):
self.lock = threading.Lock()
self.continuous_errors = 0
self.suspend_end_time = 0
self.suspend_reason = None
@property
def is_suspended(self):
return self.suspend_end_time >= time()
def suspend(self, suspended_time, suspend_reason):
with self.lock:
# update continuous_errors / suspend_end_time
self.continuous_errors += 1
if suspended_time is None:
suspended_time = min(settings['search']['max_ban_time_on_fail'],
self.continuous_errors * settings['search']['ban_time_on_fail'])
self.suspend_end_time = time() + suspended_time
self.suspend_reason = suspend_reason
logger.debug('Suspend engine for %i seconds', suspended_time)
def resume(self):
with self.lock:
# reset the suspend variables
self.continuous_errors = 0
self.suspend_end_time = 0
self.suspend_reason = None
class EngineProcessor(ABC): class EngineProcessor(ABC):
__slots__ = 'engine', 'engine_name', 'lock', 'suspended_status'
def __init__(self, engine, engine_name): def __init__(self, engine, engine_name):
self.engine = engine self.engine = engine
self.engine_name = engine_name self.engine_name = engine_name
self.lock = threading.Lock()
key = get_network(self.engine_name)
key = id(key) if key else self.engine_name
self.suspended_status = SUSPENDED_STATUS.setdefault(key, SuspendedStatus())
def handle_exception(self, result_container, reason, exception, suspend=False, display_exception=True):
# update result_container
error_message = str(exception) if display_exception and exception else None
result_container.add_unresponsive_engine(self.engine_name, reason, error_message)
# metrics
with self.lock:
self.engine.stats['errors'] += 1
if exception:
record_exception(self.engine_name, exception)
else:
record_error(self.engine_name, reason)
# suspend the engine ?
if suspend:
suspended_time = None
if isinstance(exception, SearxEngineAccessDeniedException):
suspended_time = exception.suspended_time
self.suspended_status.suspend(suspended_time, reason) # pylint: disable=no-member
def _extend_container_basic(self, result_container, start_time, search_results):
# update result_container
result_container.extend(self.engine_name, search_results)
engine_time = time() - start_time
page_load_time = get_time_for_thread()
result_container.add_timing(self.engine_name, engine_time, page_load_time)
# metrics
with self.lock:
self.engine.stats['engine_time'] += engine_time
self.engine.stats['engine_time_count'] += 1
# update stats with the total HTTP time
if page_load_time is not None and 'page_load_time' in self.engine.stats:
self.engine.stats['page_load_time'] += page_load_time
self.engine.stats['page_load_count'] += 1
def extend_container(self, result_container, start_time, search_results):
if getattr(threading.current_thread(), '_timeout', False):
# the main thread is not waiting anymore
self.handle_exception(result_container, 'Timeout', None)
else:
# check if the engine accepted the request
if search_results is not None:
self._extend_container_basic(result_container, start_time, search_results)
self.suspended_status.resume()
def extend_container_if_suspended(self, result_container):
if self.suspended_status.is_suspended:
result_container.add_unresponsive_engine(self.engine_name,
self.suspended_status.suspend_reason,
suspended=True)
return True
return False
def get_params(self, search_query, engine_category): def get_params(self, search_query, engine_category):
# if paging is not supported, skip # if paging is not supported, skip

View File

@ -1,51 +1,26 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
import threading
from time import time
from searx import logger from searx import logger
from searx.metrology.error_recorder import record_exception, record_error
from searx.search.processors.abstract import EngineProcessor from searx.search.processors.abstract import EngineProcessor
logger = logger.getChild('search.processor.offline') logger = logger.getChild('searx.search.processor.offline')
class OfflineProcessor(EngineProcessor): class OfflineProcessor(EngineProcessor):
engine_type = 'offline' engine_type = 'offline'
def _record_stats_on_error(self, result_container, start_time):
engine_time = time() - start_time
result_container.add_timing(self.engine_name, engine_time, engine_time)
with threading.RLock():
self.engine.stats['errors'] += 1
def _search_basic(self, query, params): def _search_basic(self, query, params):
return self.engine.search(query, params) return self.engine.search(query, params)
def search(self, query, params, result_container, start_time, timeout_limit): def search(self, query, params, result_container, start_time, timeout_limit):
try: try:
search_results = self._search_basic(query, params) search_results = self._search_basic(query, params)
self.extend_container(result_container, start_time, search_results)
if search_results:
result_container.extend(self.engine_name, search_results)
engine_time = time() - start_time
result_container.add_timing(self.engine_name, engine_time, engine_time)
with threading.RLock():
self.engine.stats['engine_time'] += engine_time
self.engine.stats['engine_time_count'] += 1
except ValueError as e: except ValueError as e:
record_exception(self.engine_name, e) # do not record the error
self._record_stats_on_error(result_container, start_time)
logger.exception('engine {0} : invalid input : {1}'.format(self.engine_name, e)) logger.exception('engine {0} : invalid input : {1}'.format(self.engine_name, e))
except Exception as e: except Exception as e:
record_exception(self.engine_name, e) self.handle_exception(result_container, 'unexpected crash', e)
self._record_stats_on_error(result_container, start_time)
result_container.add_unresponsive_engine(self.engine_name, 'unexpected crash', str(e))
logger.exception('engine {0} : exception : {1}'.format(self.engine_name, e)) logger.exception('engine {0} : exception : {1}'.format(self.engine_name, e))
else:
if getattr(threading.current_thread(), '_timeout', False):
record_error(self.engine_name, 'Timeout')

View File

@ -1,23 +1,21 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
from time import time from time import time
import threading
import asyncio import asyncio
import httpx import httpx
import searx.network import searx.network
from searx.engines import settings
from searx import logger from searx import logger
from searx.utils import gen_useragent from searx.utils import gen_useragent
from searx.exceptions import (SearxEngineAccessDeniedException, SearxEngineCaptchaException, from searx.exceptions import (SearxEngineAccessDeniedException, SearxEngineCaptchaException,
SearxEngineTooManyRequestsException,) SearxEngineTooManyRequestsException,)
from searx.metrology.error_recorder import record_exception, record_error from searx.metrology.error_recorder import record_error
from searx.search.processors.abstract import EngineProcessor from searx.search.processors.abstract import EngineProcessor
logger = logger.getChild('search.processor.online') logger = logger.getChild('searx.search.processor.online')
def default_request_params(): def default_request_params():
@ -41,11 +39,6 @@ class OnlineProcessor(EngineProcessor):
if params is None: if params is None:
return None return None
# skip suspended engines
if self.engine.suspend_end_time >= time():
logger.debug('Engine currently suspended: %s', self.engine_name)
return None
# add default params # add default params
params.update(default_request_params()) params.update(default_request_params())
@ -130,89 +123,38 @@ class OnlineProcessor(EngineProcessor):
# set the network # set the network
searx.network.set_context_network_name(self.engine_name) searx.network.set_context_network_name(self.engine_name)
# suppose everything will be alright
http_exception = False
suspended_time = None
try: try:
# send requests and parse the results # send requests and parse the results
search_results = self._search_basic(query, params) search_results = self._search_basic(query, params)
self.extend_container(result_container, start_time, search_results)
# check if the engine accepted the request except (httpx.TimeoutException, asyncio.TimeoutError) as e:
if search_results is not None: # requests timeout (connect or read)
# yes, so add results self.handle_exception(result_container, 'HTTP timeout', e, suspend=True, display_exception=False)
result_container.extend(self.engine_name, search_results) logger.error("engine {0} : HTTP requests timeout"
"(search duration : {1} s, timeout: {2} s) : {3}"
# update engine time when there is no exception .format(self.engine_name, time() - start_time,
engine_time = time() - start_time timeout_limit,
page_load_time = searx.network.get_time_for_thread() e.__class__.__name__))
result_container.add_timing(self.engine_name, engine_time, page_load_time) except (httpx.HTTPError, httpx.StreamError) as e:
with threading.RLock(): # other requests exception
self.engine.stats['engine_time'] += engine_time self.handle_exception(result_container, 'HTTP error', e, suspend=True, display_exception=False)
self.engine.stats['engine_time_count'] += 1 logger.exception("engine {0} : requests exception"
# update stats with the total HTTP time
self.engine.stats['page_load_time'] += page_load_time
self.engine.stats['page_load_count'] += 1
except Exception as e:
record_exception(self.engine_name, e)
# Timing
engine_time = time() - start_time
page_load_time = searx.network.get_time_for_thread()
result_container.add_timing(self.engine_name, engine_time, page_load_time)
# Record the errors
with threading.RLock():
self.engine.stats['errors'] += 1
if (issubclass(e.__class__, (httpx.TimeoutException, asyncio.TimeoutError))):
result_container.add_unresponsive_engine(self.engine_name, 'HTTP timeout')
# requests timeout (connect or read)
logger.error("engine {0} : HTTP requests timeout"
"(search duration : {1} s, timeout: {2} s) : {3}" "(search duration : {1} s, timeout: {2} s) : {3}"
.format(self.engine_name, engine_time, timeout_limit, e.__class__.__name__)) .format(self.engine_name, time() - start_time,
http_exception = True timeout_limit,
elif (issubclass(e.__class__, (httpx.HTTPError, httpx.StreamError))): e))
result_container.add_unresponsive_engine(self.engine_name, 'HTTP error') except SearxEngineCaptchaException as e:
# other requests exception self.handle_exception(result_container, 'CAPTCHA required', e, suspend=True, display_exception=False)
logger.exception("engine {0} : requests exception" logger.exception('engine {0} : CAPTCHA'.format(self.engine_name))
"(search duration : {1} s, timeout: {2} s) : {3}" except SearxEngineTooManyRequestsException as e:
.format(self.engine_name, engine_time, timeout_limit, e)) self.handle_exception(result_container, 'too many requests', e, suspend=True, display_exception=False)
http_exception = True logger.exception('engine {0} : Too many requests'.format(self.engine_name))
elif (issubclass(e.__class__, SearxEngineCaptchaException)): except SearxEngineAccessDeniedException as e:
result_container.add_unresponsive_engine(self.engine_name, 'CAPTCHA required') self.handle_exception(result_container, 'blocked', e, suspend=True, display_exception=False)
logger.exception('engine {0} : CAPTCHA'.format(self.engine_name)) logger.exception('engine {0} : Searx is blocked'.format(self.engine_name))
suspended_time = e.suspended_time # pylint: disable=no-member except Exception as e:
elif (issubclass(e.__class__, SearxEngineTooManyRequestsException)): self.handle_exception(result_container, 'unexpected crash', e, display_exception=False)
result_container.add_unresponsive_engine(self.engine_name, 'too many requests') logger.exception('engine {0} : exception : {1}'.format(self.engine_name, e))
logger.exception('engine {0} : Too many requests'.format(self.engine_name))
suspended_time = e.suspended_time # pylint: disable=no-member
elif (issubclass(e.__class__, SearxEngineAccessDeniedException)):
result_container.add_unresponsive_engine(self.engine_name, 'blocked')
logger.exception('engine {0} : Searx is blocked'.format(self.engine_name))
suspended_time = e.suspended_time # pylint: disable=no-member
else:
result_container.add_unresponsive_engine(self.engine_name, 'unexpected crash')
# others errors
logger.exception('engine {0} : exception : {1}'.format(self.engine_name, e))
else:
if getattr(threading.current_thread(), '_timeout', False):
record_error(self.engine_name, 'Timeout')
# suspend the engine if there is an HTTP error
# or suspended_time is defined
with threading.RLock():
if http_exception or suspended_time:
# update continuous_errors / suspend_end_time
self.engine.continuous_errors += 1
if suspended_time is None:
suspended_time = min(settings['search']['max_ban_time_on_fail'],
self.engine.continuous_errors * settings['search']['ban_time_on_fail'])
self.engine.suspend_end_time = time() + suspended_time
else:
# reset the suspend variables
self.engine.continuous_errors = 0
self.engine.suspend_end_time = 0
def get_default_tests(self): def get_default_tests(self):
tests = {} tests = {}

View File

@ -764,6 +764,8 @@ def __get_translated_errors(unresponsive_engines):
error_msg = gettext(unresponsive_engine[1]) error_msg = gettext(unresponsive_engine[1])
if unresponsive_engine[2]: if unresponsive_engine[2]:
error_msg = "{} {}".format(error_msg, unresponsive_engine[2]) error_msg = "{} {}".format(error_msg, unresponsive_engine[2])
if unresponsive_engine[3]:
error_msg = gettext('Suspended') + ': ' + error_msg
translated_errors.add((unresponsive_engine[0], error_msg)) translated_errors.add((unresponsive_engine[0], error_msg))
return translated_errors return translated_errors