From e2f28849f510e9c422b7ff0991c16c5d59fee4e9 Mon Sep 17 00:00:00 2001 From: Alexandre Flament Date: Fri, 15 Jul 2022 18:38:32 +0200 Subject: [PATCH] Add searx/shared/shared_redis Implements: * a scheduler on Redis * shared_abstract.SharedDict --- searx/shared/__init__.py | 57 +++++++++------ searx/shared/redisdb.py | 2 +- searx/shared/shared_redis.py | 137 +++++++++++++++++++++++++++++++++++ 3 files changed, 172 insertions(+), 24 deletions(-) create mode 100644 searx/shared/shared_redis.py diff --git a/searx/shared/__init__.py b/searx/shared/__init__.py index d10ddb33d..c33a5da55 100644 --- a/searx/shared/__init__.py +++ b/searx/shared/__init__.py @@ -1,39 +1,50 @@ # SPDX-License-Identifier: AGPL-3.0-or-later import logging -import importlib logger = logging.getLogger('searx.shared') -__all__ = ['SharedDict', 'schedule'] +__all__ = ["storage", "schedule"] + try: - uwsgi = importlib.import_module('uwsgi') -except: - # no uwsgi - from .shared_simple import SimpleSharedDict as SharedDict, schedule + # First: try to use Redis + from .redisdb import client - logger.info('Use shared_simple implementation') -else: + client().ping() + from .shared_redis import RedisCacheSharedDict as SharedDict, schedule + + logger.info('Use shared_redis implementation') +except Exception as e: + # Second: try to use uwsgi try: - uwsgi.cache_update('dummy', b'dummy') - if uwsgi.cache_get('dummy') != b'dummy': - raise Exception() + import uwsgi except: - # uwsgi.ini configuration problem: disable all scheduling - logger.error( - 'uwsgi.ini configuration error, add this line to your uwsgi.ini\n' - 'cache2 = name=searxngcache,items=2000,blocks=2000,blocksize=4096,bitmap=1' - ) - from .shared_simple import SimpleSharedDict as SharedDict - - def schedule(delay, func, *args): - return False + # Third : fall back to shared_simple + from .shared_simple import SimpleSharedDict as SharedDict, schedule + logger.info('Use shared_simple implementation') else: - # uwsgi - from .shared_uwsgi import UwsgiCacheSharedDict as SharedDict, schedule + # Make sure uwsgi is okay + try: + uwsgi.cache_update('dummy', b'dummy') + if uwsgi.cache_get('dummy') != b'dummy': + raise Exception() # pylint: disable=raise-missing-from + except: + # there is exception on a get/set test: disable all scheduling + logger.exception( + 'uwsgi.ini configuration error, add this line to your uwsgi.ini\n' + 'cache2 = name=searxngcache,items=2000,blocks=2000,blocksize=4096,bitmap=1\n' + ) + from .shared_simple import SimpleSharedDict as SharedDict - logger.info('Use shared_uwsgi implementation') + def schedule(delay, func, *args): + return False + + else: + # use uwsgi + from .shared_uwsgi import UwsgiCacheSharedDict as SharedDict, schedule + + logger.info('Use shared_uwsgi implementation') storage = SharedDict() diff --git a/searx/shared/redisdb.py b/searx/shared/redisdb.py index bb7a0eeb4..8a39b7893 100644 --- a/searx/shared/redisdb.py +++ b/searx/shared/redisdb.py @@ -30,7 +30,7 @@ logger = logging.getLogger('searx.shared.redis') _client = None -def client(): +def client() -> redis.Redis: global _client # pylint: disable=global-statement if _client is None: # not thread safe: in the worst case scenario, two or more clients are diff --git a/searx/shared/shared_redis.py b/searx/shared/shared_redis.py new file mode 100644 index 000000000..4470177e6 --- /dev/null +++ b/searx/shared/shared_redis.py @@ -0,0 +1,137 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +""" +Scheduler: +* use only the time from Redis +* use two HSET on Redis: + * SearXNG_scheduler: for each function, the timestamp of the next call + * SearXNG_scheduler_delay: for each function, the delay between the calls + +SCHEDULER_REGISTER_FUNCTION register a new function to call + +SCHEDULER_NOW_SCRIPT must be call in loop. It returns two values: +* in how many seconds to the script must be called again +* the list of function to call now +""" + +import threading +from typing import NamedTuple, Optional, Dict, List, Any +import logging + +from . import shared_abstract +from .redisdb import client as get_redis_client +from ..redislib import lua_script_storage + + +logger = logging.getLogger('searx.shared.shared_redis') + + +class ScheduleInfo(NamedTuple): + func: Any + args: List[Any] + + +SCHEDULED_FUNCTIONS: Dict[str, ScheduleInfo] = {} +SCHEDULER_THREAD: Optional[threading.Thread] = None +SCHEDULER_EVENT = threading.Event() + + +SCHEDULER_REGISTER_FUNCTION = """ +local hash_key = 'SearXNG_scheduler_ts' +local hash_delay_key = 'SearXNG_scheduler_delay' +local now = redis.call('TIME')[1] +local redis_key = KEYS[1] +local delay = ARGV[1] +redis.call('HSET', hash_key, redis_key, now + delay) +redis.call('HSET', hash_delay_key, redis_key, delay) +""" + + +SCHEDULER_NOW_SCRIPT = """ +local hash_key = 'SearXNG_scheduler_ts' +local hash_delay_key = 'SearXNG_scheduler_delay' +local now = redis.call('TIME')[1] +local result = {} +local next_call_ts_list = {} + +local flat_map = redis.call('HGETALL', hash_key) +for i = 1, #flat_map, 2 do + -- + local redis_key = flat_map[i] + local next_call_ts = flat_map[i + 1] + -- do we have to exec the function now? + if next_call_ts <= now then + -- the function must be called now + table.insert(result, redis_key) + -- schedule next call of the function + local delay = redis.call('HGET', hash_delay_key, redis_key) + next_call_ts = redis.call('HINCRBY', hash_key, redis_key, delay) + end + -- update next_call_ts_list + -- so later, we can get the minimum value of next_call_ts_list + table.insert(next_call_ts_list, next_call_ts) +end + +-- the first result contains the delay before the next call to this script +local next_call_min_ts = math.min(unpack(next_call_ts_list)) +local next_call_delay = next_call_min_ts - now +table.insert(result, 1, next_call_delay) + +return result +""" + + +def scheduler_loop(): + while True: + script = lua_script_storage(get_redis_client(), SCHEDULER_NOW_SCRIPT) + result = script() + + next_call_delay = result.pop(0) + + # call functions + for redis_key in result: + redis_key = redis_key.decode() + info = SCHEDULED_FUNCTIONS[redis_key] + try: + logger.debug('Run %s', redis_key) + info.func(*info.args) + except Exception: + logger.exception("Error calling %s", redis_key) + + # wait for the time defined by the Redis script (next_call_delay) + # or continue if another function has been scheduled (SCHEDULER_EVENT.set()) + SCHEDULER_EVENT.clear() + SCHEDULER_EVENT.wait(timeout=next_call_delay) + + +def schedule(delay, func, *args): + global SCHEDULER_THREAD + + redis_key = func.__module__ + '.' + func.__qualname__ + SCHEDULED_FUNCTIONS[redis_key] = ScheduleInfo(func, args) + script = lua_script_storage(get_redis_client(), SCHEDULER_REGISTER_FUNCTION) + script(args=[delay], keys=[redis_key]) + # + if SCHEDULER_THREAD is not None: + # the scheduler thread has been started : update the waiting time + SCHEDULER_EVENT.set() + else: + # start the scheduler thread + SCHEDULER_THREAD = threading.Thread(target=scheduler_loop, name='scheduler') + SCHEDULER_THREAD.daemon = True + SCHEDULER_THREAD.start() + return True + + +class RedisCacheSharedDict(shared_abstract.SharedDict): + def get_int(self, key: str) -> Optional[int]: + return int(get_redis_client().get(key)) + + def set_int(self, key: str, value: int): + get_redis_client().set(key, str(value).encode()) + + def get_str(self, key: str) -> Optional[str]: + value = get_redis_client().get(key) + return None if value is None else value.decode() + + def set_str(self, key: str, value: str): + get_redis_client().set(key, value.encode())