[mod] refactoring: processors

searx.search.processor.abstract.EngineProcessor:
* manages suspend time
* adds the results to the ResultContainer (extend_container method)
* handles exceptions (handle_exception method)
This commit is contained in:
Alexandre Flament 2021-04-13 15:21:53 +02:00
parent 01cefffbf6
commit 015401be3d
6 changed files with 118 additions and 122 deletions

View file

@ -51,8 +51,6 @@ engine_default_args = {'paging': False,
'shortcut': '-', 'shortcut': '-',
'disabled': False, 'disabled': False,
'enable_http': False, 'enable_http': False,
'suspend_end_time': 0,
'continuous_errors': 0,
'time_range_support': False, 'time_range_support': False,
'engine_type': 'online', 'engine_type': 'online',
'display_error_messages': True, 'display_error_messages': True,

View file

@ -111,7 +111,7 @@ class Search:
if request_params is None: if request_params is None:
continue continue
with threading.RLock(): with processor.lock:
processor.engine.stats['sent_search_count'] += 1 processor.engine.stats['sent_search_count'] += 1
# append request to list # append request to list

View file

@ -4,7 +4,6 @@ import typing
import types import types
import functools import functools
import itertools import itertools
import threading
from time import time from time import time
from urllib.parse import urlparse from urllib.parse import urlparse
@ -385,7 +384,7 @@ class Checker:
engineref_category = search_query.engineref_list[0].category engineref_category = search_query.engineref_list[0].category
params = self.processor.get_params(search_query, engineref_category) params = self.processor.get_params(search_query, engineref_category)
if params is not None: if params is not None:
with threading.RLock(): with self.processor.lock:
self.processor.engine.stats['sent_search_count'] += 1 self.processor.engine.stats['sent_search_count'] += 1
self.processor.search(search_query.query, params, result_container, time(), 5) self.processor.search(search_query.query, params, result_container, time(), 5)
return result_container return result_container

View file

@ -1,7 +1,14 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
import threading
from abc import abstractmethod, ABC from abc import abstractmethod, ABC
from time import time
from searx import logger from searx import logger
from searx.engines import settings
from searx.network import get_time_for_thread
from searx.metrology.error_recorder import record_exception, record_error
from searx.exceptions import SearxEngineAccessDeniedException
logger = logger.getChild('searx.search.processor') logger = logger.getChild('searx.search.processor')
@ -9,11 +16,86 @@ logger = logger.getChild('searx.search.processor')
class EngineProcessor(ABC): class EngineProcessor(ABC):
__slots__ = 'engine', 'engine_name', 'suspend_end_time', 'suspend_reason', 'continuous_errors', 'lock'
def __init__(self, engine, engine_name): def __init__(self, engine, engine_name):
self.engine = engine self.engine = engine
self.engine_name = engine_name self.engine_name = engine_name
self.suspend_end_time = 0
self.suspend_reason = None
self.continuous_errors = 0
self.lock = threading.RLock()
@property
def is_suspended(self):
return self.suspend_end_time >= time()
def _suspend(self, suspended_time, suspend_reason):
with self.lock:
# update continuous_errors / suspend_end_time
self.continuous_errors += 1
if suspended_time is None:
suspended_time = min(settings['search']['max_ban_time_on_fail'],
self.continuous_errors * settings['search']['ban_time_on_fail'])
self.suspend_end_time = time() + suspended_time
self.suspend_reason = suspend_reason
logger.debug('Suspend engine for %i seconds', suspended_time)
def _resume(self):
with self.lock:
# reset the suspend variables
self.continuous_errors = 0
self.suspend_end_time = 0
self.suspend_reason = None
def handle_exception(self, result_container, reason, exception, suspend=False):
# update result_container
result_container.add_unresponsive_engine(self.engine_name, reason or str(exception))
# metrics
with self.lock:
self.engine.stats['errors'] += 1
if exception:
record_exception(self.engine_name, exception)
else:
record_error(self.engine_name, reason)
# suspend the engine ?
if suspend:
suspended_time = None
if isinstance(exception, SearxEngineAccessDeniedException):
suspended_time = exception.suspended_time
self._suspend(suspended_time, reason or str(exception)) # pylint: disable=no-member
def _extend_container_basic(self, result_container, start_time, search_results):
# update result_container
result_container.extend(self.engine_name, search_results)
engine_time = time() - start_time
page_load_time = get_time_for_thread()
result_container.add_timing(self.engine_name, engine_time, page_load_time)
# metrics
with self.lock:
self.engine.stats['engine_time'] += engine_time
self.engine.stats['engine_time_count'] += 1
# update stats with the total HTTP time
if page_load_time is not None and 'page_load_time' in self.engine.stats:
self.engine.stats['page_load_time'] += page_load_time
self.engine.stats['page_load_count'] += 1
def extend_container(self, result_container, start_time, search_results):
if getattr(threading.current_thread(), '_timeout', False):
# the main thread is not waiting anymore
self.handle_exception(result_container, 'Timeout', None)
else:
# check if the engine accepted the request
if search_results is not None:
self._extend_container_basic(result_container, start_time, search_results)
self._resume()
def get_params(self, search_query, engine_category): def get_params(self, search_query, engine_category):
# skip suspended engines
if self.is_suspended:
logger.debug('Engine currently suspended: %s', self.engine_name)
return None
# if paging is not supported, skip # if paging is not supported, skip
if search_query.pageno > 1 and not self.engine.paging: if search_query.pageno > 1 and not self.engine.paging:
return None return None

View file

@ -1,51 +1,26 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
import threading
from time import time
from searx import logger from searx import logger
from searx.metrology.error_recorder import record_exception, record_error
from searx.search.processors.abstract import EngineProcessor from searx.search.processors.abstract import EngineProcessor
logger = logger.getChild('search.processor.offline') logger = logger.getChild('searx.search.processor.offline')
class OfflineProcessor(EngineProcessor): class OfflineProcessor(EngineProcessor):
engine_type = 'offline' engine_type = 'offline'
def _record_stats_on_error(self, result_container, start_time):
engine_time = time() - start_time
result_container.add_timing(self.engine_name, engine_time, engine_time)
with threading.RLock():
self.engine.stats['errors'] += 1
def _search_basic(self, query, params): def _search_basic(self, query, params):
return self.engine.search(query, params) return self.engine.search(query, params)
def search(self, query, params, result_container, start_time, timeout_limit): def search(self, query, params, result_container, start_time, timeout_limit):
try: try:
search_results = self._search_basic(query, params) search_results = self._search_basic(query, params)
self.extend_container(result_container, start_time, search_results)
if search_results:
result_container.extend(self.engine_name, search_results)
engine_time = time() - start_time
result_container.add_timing(self.engine_name, engine_time, engine_time)
with threading.RLock():
self.engine.stats['engine_time'] += engine_time
self.engine.stats['engine_time_count'] += 1
except ValueError as e: except ValueError as e:
record_exception(self.engine_name, e) # do not record the error
self._record_stats_on_error(result_container, start_time)
logger.exception('engine {0} : invalid input : {1}'.format(self.engine_name, e)) logger.exception('engine {0} : invalid input : {1}'.format(self.engine_name, e))
except Exception as e: except Exception as e:
record_exception(self.engine_name, e) self.handle_exception(result_container, 'unexpected crash', e)
self._record_stats_on_error(result_container, start_time)
result_container.add_unresponsive_engine(self.engine_name, 'unexpected crash', str(e))
logger.exception('engine {0} : exception : {1}'.format(self.engine_name, e)) logger.exception('engine {0} : exception : {1}'.format(self.engine_name, e))
else:
if getattr(threading.current_thread(), '_timeout', False):
record_error(self.engine_name, 'Timeout')

View file

@ -1,23 +1,21 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
from time import time from time import time
import threading
import asyncio import asyncio
import httpx import httpx
import searx.network import searx.network
from searx.engines import settings
from searx import logger from searx import logger
from searx.utils import gen_useragent from searx.utils import gen_useragent
from searx.exceptions import (SearxEngineAccessDeniedException, SearxEngineCaptchaException, from searx.exceptions import (SearxEngineAccessDeniedException, SearxEngineCaptchaException,
SearxEngineTooManyRequestsException,) SearxEngineTooManyRequestsException,)
from searx.metrology.error_recorder import record_exception, record_error from searx.metrology.error_recorder import record_error
from searx.search.processors.abstract import EngineProcessor from searx.search.processors.abstract import EngineProcessor
logger = logger.getChild('search.processor.online') logger = logger.getChild('searx.search.processor.online')
def default_request_params(): def default_request_params():
@ -41,11 +39,6 @@ class OnlineProcessor(EngineProcessor):
if params is None: if params is None:
return None return None
# skip suspended engines
if self.engine.suspend_end_time >= time():
logger.debug('Engine currently suspended: %s', self.engine_name)
return None
# add default params # add default params
params.update(default_request_params()) params.update(default_request_params())
@ -130,89 +123,38 @@ class OnlineProcessor(EngineProcessor):
# set the network # set the network
searx.network.set_context_network_name(self.engine_name) searx.network.set_context_network_name(self.engine_name)
# suppose everything will be alright
http_exception = False
suspended_time = None
try: try:
# send requests and parse the results # send requests and parse the results
search_results = self._search_basic(query, params) search_results = self._search_basic(query, params)
self.extend_container(result_container, start_time, search_results)
# check if the engine accepted the request except (httpx.TimeoutException, asyncio.TimeoutError) as e:
if search_results is not None: # requests timeout (connect or read)
# yes, so add results self.handle_exception(result_container, 'HTTP timeout', e, suspend=True)
result_container.extend(self.engine_name, search_results) logger.error("engine {0} : HTTP requests timeout"
"(search duration : {1} s, timeout: {2} s) : {3}"
# update engine time when there is no exception .format(self.engine_name, time() - start_time,
engine_time = time() - start_time timeout_limit,
page_load_time = searx.network.get_time_for_thread() e.__class__.__name__))
result_container.add_timing(self.engine_name, engine_time, page_load_time) except (httpx.HTTPError, httpx.StreamError) as e:
with threading.RLock(): # other requests exception
self.engine.stats['engine_time'] += engine_time self.handle_exception(result_container, 'HTTP error', e, suspend=True)
self.engine.stats['engine_time_count'] += 1 logger.exception("engine {0} : requests exception"
# update stats with the total HTTP time
self.engine.stats['page_load_time'] += page_load_time
self.engine.stats['page_load_count'] += 1
except Exception as e:
record_exception(self.engine_name, e)
# Timing
engine_time = time() - start_time
page_load_time = searx.network.get_time_for_thread()
result_container.add_timing(self.engine_name, engine_time, page_load_time)
# Record the errors
with threading.RLock():
self.engine.stats['errors'] += 1
if (issubclass(e.__class__, (httpx.TimeoutException, asyncio.TimeoutError))):
result_container.add_unresponsive_engine(self.engine_name, 'HTTP timeout')
# requests timeout (connect or read)
logger.error("engine {0} : HTTP requests timeout"
"(search duration : {1} s, timeout: {2} s) : {3}" "(search duration : {1} s, timeout: {2} s) : {3}"
.format(self.engine_name, engine_time, timeout_limit, e.__class__.__name__)) .format(self.engine_name, time() - start_time,
http_exception = True timeout_limit,
elif (issubclass(e.__class__, (httpx.HTTPError, httpx.StreamError))): e))
result_container.add_unresponsive_engine(self.engine_name, 'HTTP error') except SearxEngineCaptchaException as e:
# other requests exception self.handle_exception(result_container, 'CAPTCHA required', e, suspend=True)
logger.exception("engine {0} : requests exception" logger.exception('engine {0} : CAPTCHA'.format(self.engine_name))
"(search duration : {1} s, timeout: {2} s) : {3}" except SearxEngineTooManyRequestsException as e:
.format(self.engine_name, engine_time, timeout_limit, e)) self.handle_exception(result_container, 'too many requests', e, suspend=True)
http_exception = True logger.exception('engine {0} : Too many requests'.format(self.engine_name))
elif (issubclass(e.__class__, SearxEngineCaptchaException)): except SearxEngineAccessDeniedException as e:
result_container.add_unresponsive_engine(self.engine_name, 'CAPTCHA required') self.handle_exception(result_container, 'blocked', e, suspend=True)
logger.exception('engine {0} : CAPTCHA'.format(self.engine_name)) logger.exception('engine {0} : Searx is blocked'.format(self.engine_name))
suspended_time = e.suspended_time # pylint: disable=no-member except Exception as e:
elif (issubclass(e.__class__, SearxEngineTooManyRequestsException)): self.handle_exception(result_container, 'unexpected crash', e)
result_container.add_unresponsive_engine(self.engine_name, 'too many requests') logger.exception('engine {0} : exception : {1}'.format(self.engine_name, e))
logger.exception('engine {0} : Too many requests'.format(self.engine_name))
suspended_time = e.suspended_time # pylint: disable=no-member
elif (issubclass(e.__class__, SearxEngineAccessDeniedException)):
result_container.add_unresponsive_engine(self.engine_name, 'blocked')
logger.exception('engine {0} : Searx is blocked'.format(self.engine_name))
suspended_time = e.suspended_time # pylint: disable=no-member
else:
result_container.add_unresponsive_engine(self.engine_name, 'unexpected crash')
# others errors
logger.exception('engine {0} : exception : {1}'.format(self.engine_name, e))
else:
if getattr(threading.current_thread(), '_timeout', False):
record_error(self.engine_name, 'Timeout')
# suspend the engine if there is an HTTP error
# or suspended_time is defined
with threading.RLock():
if http_exception or suspended_time:
# update continuous_errors / suspend_end_time
self.engine.continuous_errors += 1
if suspended_time is None:
suspended_time = min(settings['search']['max_ban_time_on_fail'],
self.engine.continuous_errors * settings['search']['ban_time_on_fail'])
self.engine.suspend_end_time = time() + suspended_time
else:
# reset the suspend variables
self.engine.continuous_errors = 0
self.engine.suspend_end_time = 0
def get_default_tests(self): def get_default_tests(self):
tests = {} tests = {}