[fix] calculator: call python using subprocess.Popen

call Python directly with -S and -I parameters to skip loading
of the standard library and the SearXNG module.

The actual calculator is moved to a standalone script: calculator_process.py
This commit is contained in:
Alexandre Flament 2025-01-11 23:12:43 +00:00
parent 94a0b415ef
commit 1a11282ab4
2 changed files with 99 additions and 64 deletions

View file

@ -2,11 +2,10 @@
"""Calculate mathematical expressions using ack#eval """Calculate mathematical expressions using ack#eval
""" """
import ast
import re import re
import operator import sys
from multiprocessing import Process, Queue import subprocess
from typing import Callable from pathlib import Path
import flask import flask
import babel import babel
@ -23,68 +22,46 @@ plugin_id = 'calculator'
logger = logger.getChild(plugin_id) logger = logger.getChild(plugin_id)
operators: dict[type, Callable] = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.BitXor: operator.xor,
ast.USub: operator.neg,
}
def call_calculator(query_py_formatted, timeout):
def _eval_expr(expr): calculator_process_py_path = Path(__file__).parent.absolute() / "calculator_process.py"
""" # see https://docs.python.org/3/using/cmdline.html
>>> _eval_expr('2^6') # -S Disable the import of the module site and the site-dependent manipulations
4 # of sys.path that it entails. Also disable these manipulations if site is
>>> _eval_expr('2**6') # explicitly imported later (call site.main() if you want them to be triggered).
64 # -I Run Python in isolated mode. This also implies -E, -P and -s options.
>>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)') # -E Ignore all PYTHON* environment variables, e.g. PYTHONPATH and PYTHONHOME, that might be set.
-5.0 # -P Dont prepend a potentially unsafe path to sys.path
""" # -s Dont add the user site-packages directory to sys.path.
process = subprocess.Popen( # pylint: disable=R1732
[sys.executable, "-S", "-I", calculator_process_py_path, query_py_formatted],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
try: try:
return _eval(ast.parse(expr, mode='eval').body) stdout, stderr = process.communicate(timeout=timeout)
except ZeroDivisionError: if process.returncode == 0 and not stderr:
# This is undefined return stdout
return "" logger.debug("calculator exited with stderr %s", stderr)
return None
except subprocess.TimeoutExpired:
def _eval(node): process.terminate()
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
return node.value
if isinstance(node, ast.BinOp):
return operators[type(node.op)](_eval(node.left), _eval(node.right))
if isinstance(node, ast.UnaryOp):
return operators[type(node.op)](_eval(node.operand))
raise TypeError(node)
def timeout_func(timeout, func, *args, **kwargs):
def handler(q: Queue, func, args, **kwargs): # pylint:disable=invalid-name
try: try:
q.put(func(*args, **kwargs)) # Give the process a grace period to terminate
except: process.communicate(timeout=2)
q.put(None) except subprocess.TimeoutExpired:
raise # Forcefully kill the process
process.kill()
que = Queue() process.communicate()
p = Process(target=handler, args=(que, func, args), kwargs=kwargs) logger.debug("calculator terminated after timeout")
p.start() # Capture any remaining output
p.join(timeout=timeout) return None
ret_val = None finally:
if not p.is_alive(): # Ensure the process is fully cleaned up
ret_val = que.get() if process.poll() is None: # If still running
else: process.kill()
logger.debug("terminate function after timeout is exceeded") process.communicate()
p.terminate()
p.join()
p.close()
return ret_val
def post_search(_request, search): def post_search(_request, search):
@ -122,7 +99,7 @@ def post_search(_request, search):
query_py_formatted = query.replace("^", "**") query_py_formatted = query.replace("^", "**")
# Prevent the runtime from being longer than 50 ms # Prevent the runtime from being longer than 50 ms
result = timeout_func(0.05, _eval_expr, query_py_formatted) result = call_calculator(query_py_formatted, 0.05)
if result is None or result == "": if result is None or result == "":
return True return True
result = babel.numbers.format_decimal(result, locale=ui_locale) result = babel.numbers.format_decimal(result, locale=ui_locale)

View file

@ -0,0 +1,58 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Standalone script to actually calculate mathematical expressions using ast
This is not a module, the SearXNG modules are not available here
"""
import ast
import sys
import operator
from typing import Callable
operators: dict[type, Callable] = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.BitXor: operator.xor,
ast.USub: operator.neg,
}
def _eval_expr(expr):
"""
>>> _eval_expr('2^6')
4
>>> _eval_expr('2**6')
64
>>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
-5.0
"""
try:
return _eval(ast.parse(expr, mode='eval').body)
except ZeroDivisionError:
# This is undefined
return ""
def _eval(node):
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
return node.value
if isinstance(node, ast.BinOp):
return operators[type(node.op)](_eval(node.left), _eval(node.right))
if isinstance(node, ast.UnaryOp):
return operators[type(node.op)](_eval(node.operand))
raise TypeError(node)
def main():
print(_eval_expr(sys.argv[1]), end="")
if __name__ == "__main__":
main()