add rate limiting per engine

This commit is contained in:
Marc Abonce Seguin 2022-04-24 23:52:13 -05:00
parent 666cd1f635
commit 85eca4fb22
8 changed files with 90 additions and 8 deletions

View file

@ -473,6 +473,9 @@ engine is shown. Most of the options have a default value or even are optional.
max_connections: 100
max_keepalive_connections: 10
keepalive_expiry: 5.0
rate_limit:
- max_requests: 200
interval: 60
proxies:
http:
- http://proxy1:8080
@ -545,6 +548,15 @@ engine is shown. Most of the options have a default value or even are optional.
- ``ipv4`` set ``local_addresses`` to ``0.0.0.0`` (use only IPv4 local addresses)
- ``ipv6`` set ``local_addresses`` to ``::`` (use only IPv6 local addresses)
``rate_limit``: optional
Limit how many outgoing requests is SearXNG going to send to the engines.
It is a list where each element has:
- ``max_requests`` is the maximum number of requests that will be sent to this
engine per interval.
- ``interval`` (optional) is the number of seconds before this engine's rate
limiter is reset. Defaults to 1 second if unspecified.
.. note::
A few more options are possible, but they are pretty specific to some

View file

@ -45,6 +45,7 @@ ENGINE_DEFAULT_ARGS = {
"using_tor_proxy": False,
"display_error_messages": True,
"send_accept_language_header": False,
"rate_limit": [{"max_requests": float('inf'), "interval": 1}],
"tokens": [],
"about": {},
}

View file

@ -98,6 +98,10 @@ class Search:
if request_params is None:
continue
# stop request if it exceeds engine's rate limit
if processor.exceeds_rate_limit():
continue
counter_inc('engine', engineref.name, 'search', 'count', 'sent')
# append request to list

View file

@ -15,6 +15,7 @@ from searx.engines import engines
from searx.network import get_time_for_thread, get_network
from searx.metrics import histogram_observe, counter_inc, count_exception, count_error
from searx.exceptions import SearxEngineAccessDeniedException, SearxEngineResponseException
from searx.shared import storage, run_locked
from searx.utils import get_engine_from_settings
logger = logger.getChild('searx.search.processor')
@ -137,6 +138,36 @@ class EngineProcessor(ABC):
return True
return False
def exceeds_rate_limit(self):
def check_rate_limiter(engine_name, max_requests, interval):
key = f'rate_limiter_{engine_name}_{max_requests}r/{interval}s'
# check requests count
count = storage.get_int(key)
if count is None:
# initialize counter with expiration time
storage.set_int(key, 1, interval)
elif count >= max_requests:
logger.debug(f"{engine_name} exceeded rate limit of {max_requests} requests per {interval} seconds")
return True
else:
# update counter
storage.set_int(key, count + 1)
return False
result = False
# add counter to all of the engine's rate limiters
for rate_limit in self.engine.rate_limit:
max_requests = rate_limit['max_requests']
interval = rate_limit.get('interval', 1)
if max_requests == float('inf'):
continue
if run_locked(check_rate_limiter, self.engine_name, max_requests, interval):
result = True
return result
def get_params(self, search_query, engine_category):
"""Returns a set of *request params* or ``None`` if request is not supported.

View file

@ -11,7 +11,7 @@ try:
uwsgi = importlib.import_module('uwsgi')
except:
# no uwsgi
from .shared_simple import SimpleSharedDict as SharedDict, schedule
from .shared_simple import SimpleSharedDict as SharedDict, schedule, run_locked
logger.info('Use shared_simple implementation')
else:
@ -32,7 +32,7 @@ else:
else:
# uwsgi
from .shared_uwsgi import UwsgiCacheSharedDict as SharedDict, schedule
from .shared_uwsgi import UwsgiCacheSharedDict as SharedDict, schedule, run_locked
logger.info('Use shared_uwsgi implementation')

View file

@ -10,7 +10,7 @@ class SharedDict(ABC):
pass
@abstractmethod
def set_int(self, key: str, value: int):
def set_int(self, key: str, value: int, expire: Optional[int] = None):
pass
@abstractmethod
@ -18,5 +18,5 @@ class SharedDict(ABC):
pass
@abstractmethod
def set_str(self, key: str, value: str):
def set_str(self, key: str, value: str, expire: Optional[int] = None):
pass

View file

@ -16,14 +16,28 @@ class SimpleSharedDict(shared_abstract.SharedDict):
def get_int(self, key: str) -> Optional[int]:
return self.d.get(key, None)
def set_int(self, key: str, value: int):
def set_int(self, key: str, value: int, expire: Optional[int] = None):
self.d[key] = value
if expire:
self._expire(key, expire)
def get_str(self, key: str) -> Optional[str]:
return self.d.get(key, None)
def set_str(self, key: str, value: str):
def set_str(self, key: str, value: str, expire: Optional[int] = None):
self.d[key] = value
if expire:
self._expire(key, expire)
def _expire(self, key: str, expire: int):
t = threading.Timer(expire, lambda k, d: d.pop(k), args=[key, self.d])
t.daemon = True
t.start()
def run_locked(func, *args):
# SimpleSharedDict is not actually shared, so no locking needed
return func(*args)
def schedule(delay, func, *args):

View file

@ -2,6 +2,7 @@
import time
from typing import Optional
import threading
import uwsgi # pyright: ignore # pylint: disable=E0401
from . import shared_abstract
@ -17,9 +18,11 @@ class UwsgiCacheSharedDict(shared_abstract.SharedDict):
else:
return int.from_bytes(value, 'big')
def set_int(self, key: str, value: int):
def set_int(self, key: str, value: int, expire: Optional[int] = None):
b = value.to_bytes(4, 'big')
uwsgi.cache_update(key, b)
if expire:
self._expire(key, expire)
def get_str(self, key: str) -> Optional[str]:
value = uwsgi.cache_get(key)
@ -28,9 +31,26 @@ class UwsgiCacheSharedDict(shared_abstract.SharedDict):
else:
return value.decode('utf-8')
def set_str(self, key: str, value: str):
def set_str(self, key: str, value: str, expire: Optional[int] = None):
b = value.encode('utf-8')
uwsgi.cache_update(key, b)
if expire:
self._expire(key, expire)
def _expire(self, key: str, expire: int):
t = threading.Timer(expire, uwsgi.cache_del, args=[key])
t.daemon = True
t.start()
def run_locked(func, *args):
result = None
uwsgi.lock()
try:
result = func(*args)
finally:
uwsgi.unlock()
return result
def schedule(delay, func, *args):