The checker requires Redis

Remove the abstraction in searx.shared.SharedDict.
Implement a basic and dedicated scheduler for the checker using a Redis script.
This commit is contained in:
Alexandre Flament 2022-07-15 18:38:32 +02:00
parent d764d94a70
commit fe419e355b
12 changed files with 167 additions and 237 deletions

View File

@ -93,9 +93,8 @@ def init(app, settings):
if not settings['server']['limiter']: if not settings['server']['limiter']:
return False return False
logger.debug("init limiter DB") # pylint: disable=undefined-variable if not redisdb.client():
if not redisdb.init(): logger.error("The limiter requires Redis") # pylint: disable=undefined-variable
logger.error("init limiter DB failed!!!") # pylint: disable=undefined-variable
return False return False
app.before_request(pre_request) app.before_request(pre_request)

View File

@ -2,3 +2,5 @@
from .impl import Checker from .impl import Checker
from .background import initialize, get_result from .background import initialize, get_result
__all__ = ('Checker', 'initialize', 'get_result')

View File

@ -1,26 +1,28 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint # lint: pylint
# pylint: disable=missing-module-docstring # pylint: disable=missing-module-docstring
# pyright: strict # pyright: basic
import json import json
import random
import time import time
import threading import threading
import os import os
import signal import signal
from typing import Dict, Union, List, Any, Tuple from typing import Dict, Union, List, Any, Tuple, Optional
from typing_extensions import TypedDict, Literal from typing_extensions import TypedDict, Literal
import redis.exceptions
from searx import logger, settings, searx_debug from searx import logger, settings, searx_debug
from searx.shared.redisdb import client as get_redis_client
from searx.exceptions import SearxSettingsException from searx.exceptions import SearxSettingsException
from searx.search.processors import PROCESSORS from searx.search.processors import PROCESSORS
from searx.search.checker import Checker from searx.search.checker import Checker
from searx.shared import schedule, storage # pyright: ignore from searx.search.checker.scheduler import scheduler_function
CHECKER_RESULT = 'CHECKER_RESULT' REDIS_RESULT_KEY = 'SearXNG_checker_result'
running = threading.Lock() REDIS_LOCK_KEY = 'SearXNG_checker_lock'
CheckerResult = Union['CheckerOk', 'CheckerErr', 'CheckerOther'] CheckerResult = Union['CheckerOk', 'CheckerErr', 'CheckerOther']
@ -77,20 +79,24 @@ def _get_interval(every: Any, error_msg: str) -> Tuple[int, int]:
return (every[0], every[1]) return (every[0], every[1])
def _get_every():
every = settings.get('checker', {}).get('scheduling', {}).get('every', (300, 1800))
return _get_interval(every, 'checker.scheduling.every is not a int or list')
def get_result() -> CheckerResult: def get_result() -> CheckerResult:
serialized_result = storage.get_str(CHECKER_RESULT) client = get_redis_client()
if serialized_result is not None: if client is None:
return json.loads(serialized_result) # without Redis, the checker is disabled
return {'status': 'unknown'} return {'status': 'disabled'}
serialized_result: Optional[bytes] = client.get(REDIS_RESULT_KEY)
if serialized_result is None:
# the Redis key does not exist
return {'status': 'unknown'}
return json.loads(serialized_result)
def _set_result(result: CheckerResult): def _set_result(result: CheckerResult):
storage.set_str(CHECKER_RESULT, json.dumps(result)) client = get_redis_client()
if client is None:
# without Redis, the function does nothing
return
client.set(REDIS_RESULT_KEY, json.dumps(result))
def _timestamp(): def _timestamp():
@ -98,41 +104,29 @@ def _timestamp():
def run(): def run():
if not running.acquire(blocking=False): # pylint: disable=consider-using-with
return
try: try:
logger.info('Starting checker') # use a Redis lock to make sure there is no checker running at the same time
result: CheckerOk = {'status': 'ok', 'engines': {}, 'timestamp': _timestamp()} # (this should not happen, this is a safety measure)
for name, processor in PROCESSORS.items(): with get_redis_client().lock(REDIS_LOCK_KEY, blocking_timeout=60, timeout=3600):
logger.debug('Checking %s engine', name) logger.info('Starting checker')
checker = Checker(processor) result: CheckerOk = {'status': 'ok', 'engines': {}, 'timestamp': _timestamp()}
checker.run() for name, processor in PROCESSORS.items():
if checker.test_results.successful: logger.debug('Checking %s engine', name)
result['engines'][name] = {'success': True} checker = Checker(processor)
else: checker.run()
result['engines'][name] = {'success': False, 'errors': checker.test_results.errors} if checker.test_results.successful:
result['engines'][name] = {'success': True}
else:
result['engines'][name] = {'success': False, 'errors': checker.test_results.errors}
_set_result(result) _set_result(result)
logger.info('Check done') logger.info('Check done')
except redis.exceptions.LockError:
_set_result({'status': 'error', 'timestamp': _timestamp()})
logger.exception('Error while running the checker')
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
_set_result({'status': 'error', 'timestamp': _timestamp()}) _set_result({'status': 'error', 'timestamp': _timestamp()})
logger.exception('Error while running the checker') logger.exception('Error while running the checker')
finally:
running.release()
def _run_with_delay():
every = _get_every()
delay = random.randint(0, every[1] - every[0])
logger.debug('Start checker in %i seconds', delay)
time.sleep(delay)
run()
def _start_scheduling():
every = _get_every()
if schedule(every[0], _run_with_delay):
run()
def _signal_handler(_signum: int, _frame: Any): def _signal_handler(_signum: int, _frame: Any):
@ -147,27 +141,31 @@ def initialize():
logger.info('Send SIGUSR1 signal to pid %i to start the checker', os.getpid()) logger.info('Send SIGUSR1 signal to pid %i to start the checker', os.getpid())
signal.signal(signal.SIGUSR1, _signal_handler) signal.signal(signal.SIGUSR1, _signal_handler)
# disabled by default
_set_result({'status': 'disabled'})
# special case when debug is activate # special case when debug is activate
if searx_debug and settings.get('checker', {}).get('off_when_debug', True): if searx_debug and settings['checker']['off_when_debug']:
logger.info('debug mode: checker is disabled') logger.info('debug mode: checker is disabled')
return return
# check value of checker.scheduling.every now # check value of checker.scheduling.every now
scheduling = settings.get('checker', {}).get('scheduling', None) scheduling = settings['checker']['scheduling']
if scheduling is None or not scheduling: if scheduling is None or not scheduling:
logger.info('Checker scheduler is disabled') logger.info('Checker scheduler is disabled')
return return
# # make sure there is a Redis connection
_set_result({'status': 'unknown'}) if get_redis_client() is None:
logger.error('The checker requires Redis')
return
start_after = scheduling.get('start_after', (300, 1800)) # start the background scheduler
start_after = _get_interval(start_after, 'checker.scheduling.start_after is not a int or list') every_range = _get_interval(scheduling.get('every', (300, 1800)), 'checker.scheduling.every is not a int or list')
delay = random.randint(start_after[0], start_after[1]) start_after_range = _get_interval(
logger.info('Start checker in %i seconds', delay) scheduling.get('start_after', (300, 1800)), 'checker.scheduling.start_after is not a int or list'
t = threading.Timer(delay, _start_scheduling) )
t = threading.Thread(
target=scheduler_function,
args=(start_after_range[0], start_after_range[1], every_range[0], every_range[1], run),
name='checker_scheduler',
)
t.daemon = True t.daemon = True
t.start() t.start()

View File

@ -0,0 +1,36 @@
-- SPDX-License-Identifier: AGPL-3.0-or-later
--
-- This script is not a string in scheduler.py, so editors can provide syntax highlighting.
-- The Redis KEY is defined here and not in Python on purpose:
-- only this LUA script can read and update this key to avoid lock and concurrency issues.
local redis_key = 'SearXNG_checker_next_call_ts'
local now = redis.call('TIME')[1]
local start_after_from = ARGV[1]
local start_after_to = ARGV[2]
local every_from = ARGV[3]
local every_to = ARGV[4]
local next_call_ts = redis.call('GET', redis_key)
if (next_call_ts == false or next_call_ts == nil) then
-- the scheduler has never run on this Redis instance, so:
-- 1/ the scheduler does not run now
-- 2/ the next call is a random time between start_after_from and start_after_to
local delay = start_after_from + math.random(start_after_to - start_after_from)
redis.call('SET', redis_key, now + delay)
return { false, delay }
end
-- next_call_ts is defined
-- --> if now is lower than next_call_ts then we don't run the embedded checker
-- --> if now is higher then we update next_call_ts and ask to run the embedded checker now.
local call_now = next_call_ts <= now
if call_now then
-- the checker runs now, define the timestamp of the next call:
-- this is a random delay between every_from and every_to
local periodic_delay = every_from + math.random(every_to - every_from)
next_call_ts = redis.call('INCRBY', redis_key, periodic_delay)
end
return { call_now, next_call_ts - now }

View File

@ -0,0 +1,57 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=missing-module-docstring
"""Lame scheduler which use Redis as a source of truth:
* the Redis key SearXNG_checker_next_call_ts contains the next time the embedded checker should run.
* to avoid lock, a unique Redis script reads and updates the Redis key SearXNG_checker_next_call_ts.
* this Redis script returns a list of two elements:
* the first one is a boolean. If True, the embedded checker must run now in this worker.
* the second element is the delay in second to wait before the next call to the Redis script.
This scheduler is not generic on purpose: if more feature are required, a dedicate scheduler must be used
(= a better scheduler should not use the web workers)
"""
import logging
import time
import importlib
from typing import Callable
from searx.shared.redisdb import client as get_redis_client
from searx.redislib import lua_script_storage
logger = logging.getLogger('searx.search.checker')
def scheduler_function(start_after_from: int, start_after_to: int, every_from: int, every_to: int, callback: Callable):
"""Run the checker periodically. The function never returns.
Parameters:
* start_after_from and start_after_to: when to call "callback" for the first on the Redis instance
* every_from and every_to: after the first call, how often to call "callback"
There is no issue:
* to call this function is multiple workers
* to kill workers at any time as long there is one at least one worker
"""
scheduler_now_script = importlib.resources.read_text(__package__, "scheduler.lua")
while True:
# ask the Redis script what to do
# the script says
# * if the checker must run now.
# * how to long to way before calling the script again (it can be call earlier, but not later).
script = lua_script_storage(get_redis_client(), scheduler_now_script)
call_now, wait_time = script(args=[start_after_from, start_after_to, every_from, every_to])
# does the worker run the checker now?
if call_now:
# run the checker
try:
callback()
except Exception: # pylint: disable=broad-except
logger.exception("Error calling the embedded checker")
# only worker display the wait_time
logger.info("Next call to the checker in %s seconds", wait_time)
# wait until the next call
time.sleep(wait_time)

View File

@ -225,7 +225,8 @@ SCHEMA = {
'plugins': SettingsValue(list, []), 'plugins': SettingsValue(list, []),
'enabled_plugins': SettingsValue((None, list), None), 'enabled_plugins': SettingsValue((None, list), None),
'checker': { 'checker': {
'off_when_debug': SettingsValue(bool, True), 'off_when_debug': SettingsValue(bool, True, None),
'scheduling': SettingsValue((None, dict), None, None),
}, },
'categories_as_tabs': SettingsValue(dict, CATEGORIES_AS_TABS), 'categories_as_tabs': SettingsValue(dict, CATEGORIES_AS_TABS),
'engines': SettingsValue(list, []), 'engines': SettingsValue(list, []),

View File

@ -1,39 +1,6 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""Initialization of a *shared* storage.
"""
import logging from . import redisdb
import importlib
logger = logging.getLogger('searx.shared')
__all__ = ['SharedDict', 'schedule']
try:
uwsgi = importlib.import_module('uwsgi')
except:
# no uwsgi
from .shared_simple import SimpleSharedDict as SharedDict, schedule
logger.info('Use shared_simple implementation')
else:
try:
uwsgi.cache_update('dummy', b'dummy')
if uwsgi.cache_get('dummy') != b'dummy':
raise Exception()
except:
# uwsgi.ini configuration problem: disable all scheduling
logger.error(
'uwsgi.ini configuration error, add this line to your uwsgi.ini\n'
'cache2 = name=searxngcache,items=2000,blocks=2000,blocksize=4096,bitmap=1'
)
from .shared_simple import SimpleSharedDict as SharedDict
def schedule(delay, func, *args):
return False
else:
# uwsgi
from .shared_uwsgi import UwsgiCacheSharedDict as SharedDict, schedule
logger.info('Use shared_uwsgi implementation')
storage = SharedDict()

View File

@ -26,26 +26,20 @@ import redis
from searx import get_setting from searx import get_setting
logger = logging.getLogger('searx.shared.redis') logger = logging.getLogger('searx.shared.redisdb')
_client = None _client = None
def client(): def client() -> redis.Redis:
global _client # pylint: disable=global-statement
if _client is None:
# not thread safe: in the worst case scenario, two or more clients are
# initialized only one is kept, the others are garbage collected.
_client = redis.Redis.from_url(get_setting('redis.url'))
return _client return _client
def init(): def initialize():
global _client # pylint: disable=global-statement
try: try:
c = client() _client = redis.Redis.from_url(get_setting('redis.url'))
logger.info("connected redis DB --> %s", c.acl_whoami()) logger.info("connected redis: %s", get_setting('redis.url'))
return True
except redis.exceptions.ConnectionError as exc: except redis.exceptions.ConnectionError as exc:
_pw = pwd.getpwuid(os.getuid()) _pw = pwd.getpwuid(os.getuid())
logger.error("[%s (%s)] can't connect redis DB ...", _pw.pw_name, _pw.pw_uid) logger.error("[%s (%s)] can't connect redis DB ...", _pw.pw_name, _pw.pw_uid)
logger.error(" %s", exc) logger.error(" %s", exc)
return False

View File

@ -1,22 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pyright: strict
from abc import ABC, abstractmethod
from typing import Optional
class SharedDict(ABC):
@abstractmethod
def get_int(self, key: str) -> Optional[int]:
pass
@abstractmethod
def set_int(self, key: str, value: int):
pass
@abstractmethod
def get_str(self, key: str) -> Optional[str]:
pass
@abstractmethod
def set_str(self, key: str, value: str):
pass

View File

@ -1,40 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import threading
from typing import Optional
from . import shared_abstract
class SimpleSharedDict(shared_abstract.SharedDict):
__slots__ = ('d',)
def __init__(self):
self.d = {}
def get_int(self, key: str) -> Optional[int]:
return self.d.get(key, None)
def set_int(self, key: str, value: int):
self.d[key] = value
def get_str(self, key: str) -> Optional[str]:
return self.d.get(key, None)
def set_str(self, key: str, value: str):
self.d[key] = value
def schedule(delay, func, *args):
def call_later():
t = threading.Timer(delay, wrapper)
t.daemon = True
t.start()
def wrapper():
call_later()
func(*args)
call_later()
return True

View File

@ -1,64 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import time
from typing import Optional
import uwsgi # pyright: ignore # pylint: disable=E0401
from . import shared_abstract
_last_signal = 10
class UwsgiCacheSharedDict(shared_abstract.SharedDict):
def get_int(self, key: str) -> Optional[int]:
value = uwsgi.cache_get(key)
if value is None:
return value
else:
return int.from_bytes(value, 'big')
def set_int(self, key: str, value: int):
b = value.to_bytes(4, 'big')
uwsgi.cache_update(key, b)
def get_str(self, key: str) -> Optional[str]:
value = uwsgi.cache_get(key)
if value is None:
return value
else:
return value.decode('utf-8')
def set_str(self, key: str, value: str):
b = value.encode('utf-8')
uwsgi.cache_update(key, b)
def schedule(delay, func, *args):
"""
Can be implemented using a spooler.
https://uwsgi-docs.readthedocs.io/en/latest/PythonDecorators.html
To make the uwsgi configuration simple, use the alternative implementation.
"""
global _last_signal
def sighandler(signum):
now = int(time.time())
key = 'scheduler_call_time_signal_' + str(signum)
uwsgi.lock()
try:
updating = uwsgi.cache_get(key)
if updating is not None:
updating = int.from_bytes(updating, 'big')
if now - updating < delay:
return
uwsgi.cache_update(key, now.to_bytes(4, 'big'))
finally:
uwsgi.unlock()
func(*args)
signal_num = _last_signal
_last_signal += 1
uwsgi.register_signal(signal_num, 'worker', sighandler)
uwsgi.add_timer(signal_num, delay)
return True

View File

@ -120,6 +120,7 @@ from searx.locales import (
# renaming names from searx imports ... # renaming names from searx imports ...
from searx.autocomplete import search_autocomplete, backends as autocomplete_backends from searx.autocomplete import search_autocomplete, backends as autocomplete_backends
from searx.languages import language_codes as languages from searx.languages import language_codes as languages
from searx.shared.redisdb import initialize as redis_initialize
from searx.search import SearchWithPlugins, initialize as search_initialize from searx.search import SearchWithPlugins, initialize as search_initialize
from searx.network import stream as http_stream, set_context_network_name from searx.network import stream as http_stream, set_context_network_name
from searx.search.checker import get_result as checker_get_result from searx.search.checker import get_result as checker_get_result
@ -1384,6 +1385,7 @@ werkzeug_reloader = flask_run_development or (searx_debug and __name__ == "__mai
if not werkzeug_reloader or (werkzeug_reloader and os.environ.get("WERKZEUG_RUN_MAIN") == "true"): if not werkzeug_reloader or (werkzeug_reloader and os.environ.get("WERKZEUG_RUN_MAIN") == "true"):
locales_initialize() locales_initialize()
_INFO_PAGES = infopage.InfoPageSet() _INFO_PAGES = infopage.InfoPageSet()
redis_initialize()
plugin_initialize(app) plugin_initialize(app)
search_initialize(enable_checker=True, check_network=True, enable_metrics=settings['general']['enable_metrics']) search_initialize(enable_checker=True, check_network=True, enable_metrics=settings['general']['enable_metrics'])