diff --git a/docs/admin/engines/settings.rst b/docs/admin/engines/settings.rst index cac9d2868..f37cf973c 100644 --- a/docs/admin/engines/settings.rst +++ b/docs/admin/engines/settings.rst @@ -473,6 +473,9 @@ engine is shown. Most of the options have a default value or even are optional. max_connections: 100 max_keepalive_connections: 10 keepalive_expiry: 5.0 + rate_limit: + - max_requests: 200 + interval: 60 proxies: http: - http://proxy1:8080 @@ -545,6 +548,15 @@ engine is shown. Most of the options have a default value or even are optional. - ``ipv4`` set ``local_addresses`` to ``0.0.0.0`` (use only IPv4 local addresses) - ``ipv6`` set ``local_addresses`` to ``::`` (use only IPv6 local addresses) +``rate_limit``: optional + Limit how many outgoing requests is SearXNG going to send to the engines. + It is a list where each element has: + + - ``max_requests`` is the maximum number of requests that will be sent to this + engine per interval. + - ``interval`` (optional) is the number of seconds before this engine's rate + limiter is reset. Defaults to 1 second if unspecified. + .. note:: A few more options are possible, but they are pretty specific to some diff --git a/searx/engines/__init__.py b/searx/engines/__init__.py index c61f50d4b..9a6706871 100644 --- a/searx/engines/__init__.py +++ b/searx/engines/__init__.py @@ -45,6 +45,7 @@ ENGINE_DEFAULT_ARGS = { "using_tor_proxy": False, "display_error_messages": True, "send_accept_language_header": False, + "rate_limit": [{"max_requests": float('inf'), "interval": 1}], "tokens": [], "about": {}, } diff --git a/searx/search/__init__.py b/searx/search/__init__.py index 9d337916c..cc85a834c 100644 --- a/searx/search/__init__.py +++ b/searx/search/__init__.py @@ -98,6 +98,10 @@ class Search: if request_params is None: continue + # stop request if it exceeds engine's rate limit + if processor.exceeds_rate_limit(): + continue + counter_inc('engine', engineref.name, 'search', 'count', 'sent') # append request to list diff --git a/searx/search/processors/abstract.py b/searx/search/processors/abstract.py index d74616db0..a775fd904 100644 --- a/searx/search/processors/abstract.py +++ b/searx/search/processors/abstract.py @@ -15,6 +15,7 @@ from searx.engines import engines from searx.network import get_time_for_thread, get_network from searx.metrics import histogram_observe, counter_inc, count_exception, count_error from searx.exceptions import SearxEngineAccessDeniedException, SearxEngineResponseException +from searx.shared import storage, run_locked from searx.utils import get_engine_from_settings logger = logger.getChild('searx.search.processor') @@ -137,6 +138,36 @@ class EngineProcessor(ABC): return True return False + def exceeds_rate_limit(self): + def check_rate_limiter(engine_name, max_requests, interval): + key = f'rate_limiter_{engine_name}_{max_requests}r/{interval}s' + # check requests count + count = storage.get_int(key) + if count is None: + # initialize counter with expiration time + storage.set_int(key, 1, interval) + elif count >= max_requests: + logger.debug(f"{engine_name} exceeded rate limit of {max_requests} requests per {interval} seconds") + return True + else: + # update counter + storage.set_int(key, count + 1) + return False + + result = False + # add counter to all of the engine's rate limiters + for rate_limit in self.engine.rate_limit: + max_requests = rate_limit['max_requests'] + interval = rate_limit.get('interval', 1) + + if max_requests == float('inf'): + continue + + if run_locked(check_rate_limiter, self.engine_name, max_requests, interval): + result = True + + return result + def get_params(self, search_query, engine_category): """Returns a set of *request params* or ``None`` if request is not supported. diff --git a/searx/shared/__init__.py b/searx/shared/__init__.py index d10ddb33d..f8d217b39 100644 --- a/searx/shared/__init__.py +++ b/searx/shared/__init__.py @@ -11,7 +11,7 @@ try: uwsgi = importlib.import_module('uwsgi') except: # no uwsgi - from .shared_simple import SimpleSharedDict as SharedDict, schedule + from .shared_simple import SimpleSharedDict as SharedDict, schedule, run_locked logger.info('Use shared_simple implementation') else: @@ -32,7 +32,7 @@ else: else: # uwsgi - from .shared_uwsgi import UwsgiCacheSharedDict as SharedDict, schedule + from .shared_uwsgi import UwsgiCacheSharedDict as SharedDict, schedule, run_locked logger.info('Use shared_uwsgi implementation') diff --git a/searx/shared/shared_abstract.py b/searx/shared/shared_abstract.py index af4be30ae..32f9d98bf 100644 --- a/searx/shared/shared_abstract.py +++ b/searx/shared/shared_abstract.py @@ -10,7 +10,7 @@ class SharedDict(ABC): pass @abstractmethod - def set_int(self, key: str, value: int): + def set_int(self, key: str, value: int, expire: Optional[int] = None): pass @abstractmethod @@ -18,5 +18,5 @@ class SharedDict(ABC): pass @abstractmethod - def set_str(self, key: str, value: str): + def set_str(self, key: str, value: str, expire: Optional[int] = None): pass diff --git a/searx/shared/shared_simple.py b/searx/shared/shared_simple.py index 2b9d4c2da..0b0866982 100644 --- a/searx/shared/shared_simple.py +++ b/searx/shared/shared_simple.py @@ -16,14 +16,28 @@ class SimpleSharedDict(shared_abstract.SharedDict): def get_int(self, key: str) -> Optional[int]: return self.d.get(key, None) - def set_int(self, key: str, value: int): + def set_int(self, key: str, value: int, expire: Optional[int] = None): self.d[key] = value + if expire: + self._expire(key, expire) def get_str(self, key: str) -> Optional[str]: return self.d.get(key, None) - def set_str(self, key: str, value: str): + def set_str(self, key: str, value: str, expire: Optional[int] = None): self.d[key] = value + if expire: + self._expire(key, expire) + + def _expire(self, key: str, expire: int): + t = threading.Timer(expire, lambda k, d: d.pop(k), args=[key, self.d]) + t.daemon = True + t.start() + + +def run_locked(func, *args): + # SimpleSharedDict is not actually shared, so no locking needed + return func(*args) def schedule(delay, func, *args): diff --git a/searx/shared/shared_uwsgi.py b/searx/shared/shared_uwsgi.py index 0248c6234..d2c250bf6 100644 --- a/searx/shared/shared_uwsgi.py +++ b/searx/shared/shared_uwsgi.py @@ -2,6 +2,7 @@ import time from typing import Optional +import threading import uwsgi # pyright: ignore # pylint: disable=E0401 from . import shared_abstract @@ -17,9 +18,11 @@ class UwsgiCacheSharedDict(shared_abstract.SharedDict): else: return int.from_bytes(value, 'big') - def set_int(self, key: str, value: int): + def set_int(self, key: str, value: int, expire: Optional[int] = None): b = value.to_bytes(4, 'big') uwsgi.cache_update(key, b) + if expire: + self._expire(key, expire) def get_str(self, key: str) -> Optional[str]: value = uwsgi.cache_get(key) @@ -28,9 +31,26 @@ class UwsgiCacheSharedDict(shared_abstract.SharedDict): else: return value.decode('utf-8') - def set_str(self, key: str, value: str): + def set_str(self, key: str, value: str, expire: Optional[int] = None): b = value.encode('utf-8') uwsgi.cache_update(key, b) + if expire: + self._expire(key, expire) + + def _expire(self, key: str, expire: int): + t = threading.Timer(expire, uwsgi.cache_del, args=[key]) + t.daemon = True + t.start() + + +def run_locked(func, *args): + result = None + uwsgi.lock() + try: + result = func(*args) + finally: + uwsgi.unlock() + return result def schedule(delay, func, *args):