forked from zaclys/searxng
6e5f22e558
Implementations of the *traits* of the engines. Engine's traits are fetched from the origin engine and stored in a JSON file in the *data folder*. Most often traits are languages and region codes and their mapping from SearXNG's representation to the representation in the origin search engine. To load traits from the persistence:: searx.enginelib.traits.EngineTraitsMap.from_data() For new traits new properties can be added to the class:: searx.enginelib.traits.EngineTraits .. hint:: Implementation is downward compatible to the deprecated *supported_languages method* from the vintage implementation. The vintage code is tagged as *deprecated* an can be removed when all engines has been ported to the *traits method*. Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
248 lines
7.8 KiB
Python
248 lines
7.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import pathlib
|
|
import csv
|
|
import hashlib
|
|
import hmac
|
|
import re
|
|
import inspect
|
|
import itertools
|
|
from datetime import datetime, timedelta
|
|
from typing import Iterable, List, Tuple, Dict, TYPE_CHECKING
|
|
|
|
from io import StringIO
|
|
from codecs import getincrementalencoder
|
|
|
|
from flask_babel import gettext, format_date
|
|
|
|
from searx import logger, settings
|
|
from searx.engines import OTHER_CATEGORY
|
|
|
|
if TYPE_CHECKING:
|
|
from searx.enginelib import Engine
|
|
|
|
|
|
VALID_LANGUAGE_CODE = re.compile(r'^[a-z]{2,3}(-[a-zA-Z]{2})?$')
|
|
|
|
logger = logger.getChild('webutils')
|
|
|
|
|
|
class UnicodeWriter:
|
|
"""
|
|
A CSV writer which will write rows to CSV file "f",
|
|
which is encoded in the given encoding.
|
|
"""
|
|
|
|
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
|
|
# Redirect output to a queue
|
|
self.queue = StringIO()
|
|
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
|
|
self.stream = f
|
|
self.encoder = getincrementalencoder(encoding)()
|
|
|
|
def writerow(self, row):
|
|
self.writer.writerow(row)
|
|
# Fetch UTF-8 output from the queue ...
|
|
data = self.queue.getvalue()
|
|
data = data.strip('\x00')
|
|
# ... and re-encode it into the target encoding
|
|
data = self.encoder.encode(data)
|
|
# write to the target stream
|
|
self.stream.write(data.decode())
|
|
# empty queue
|
|
self.queue.truncate(0)
|
|
|
|
def writerows(self, rows):
|
|
for row in rows:
|
|
self.writerow(row)
|
|
|
|
|
|
def get_themes(templates_path):
|
|
"""Returns available themes list."""
|
|
return os.listdir(templates_path)
|
|
|
|
|
|
def get_hash_for_file(file: pathlib.Path) -> str:
|
|
m = hashlib.sha1()
|
|
with file.open('rb') as f:
|
|
m.update(f.read())
|
|
return m.hexdigest()
|
|
|
|
|
|
def get_static_files(static_path: str) -> Dict[str, str]:
|
|
static_files: Dict[str, str] = {}
|
|
static_path_path = pathlib.Path(static_path)
|
|
|
|
def walk(path: pathlib.Path):
|
|
for file in path.iterdir():
|
|
if file.name.startswith('.'):
|
|
# ignore hidden file
|
|
continue
|
|
if file.is_file():
|
|
static_files[str(file.relative_to(static_path_path))] = get_hash_for_file(file)
|
|
if file.is_dir() and file.name not in ('node_modules', 'src'):
|
|
# ignore "src" and "node_modules" directories
|
|
walk(file)
|
|
|
|
walk(static_path_path)
|
|
return static_files
|
|
|
|
|
|
def get_result_templates(templates_path):
|
|
result_templates = set()
|
|
templates_path_length = len(templates_path) + 1
|
|
for directory, _, files in os.walk(templates_path):
|
|
if directory.endswith('result_templates'):
|
|
for filename in files:
|
|
f = os.path.join(directory[templates_path_length:], filename)
|
|
result_templates.add(f)
|
|
return result_templates
|
|
|
|
|
|
def new_hmac(secret_key, url):
|
|
return hmac.new(secret_key.encode(), url, hashlib.sha256).hexdigest()
|
|
|
|
|
|
def is_hmac_of(secret_key, value, hmac_to_check):
|
|
hmac_of_value = new_hmac(secret_key, value)
|
|
return len(hmac_of_value) == len(hmac_to_check) and hmac.compare_digest(hmac_of_value, hmac_to_check)
|
|
|
|
|
|
def prettify_url(url, max_length=74):
|
|
if len(url) > max_length:
|
|
chunk_len = int(max_length / 2 + 1)
|
|
return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:])
|
|
else:
|
|
return url
|
|
|
|
|
|
def contains_cjko(s: str) -> bool:
|
|
"""This function check whether or not a string contains Chinese, Japanese,
|
|
or Korean characters. It employs regex and uses the u escape sequence to
|
|
match any character in a set of Unicode ranges.
|
|
|
|
Args:
|
|
s (str): string to be checked.
|
|
|
|
Returns:
|
|
bool: True if the input s contains the characters and False otherwise.
|
|
"""
|
|
unicode_ranges = (
|
|
'\u4e00-\u9fff' # Chinese characters
|
|
'\u3040-\u309f' # Japanese hiragana
|
|
'\u30a0-\u30ff' # Japanese katakana
|
|
'\u4e00-\u9faf' # Japanese kanji
|
|
'\uac00-\ud7af' # Korean hangul syllables
|
|
'\u1100-\u11ff' # Korean hangul jamo
|
|
)
|
|
return bool(re.search(fr'[{unicode_ranges}]', s))
|
|
|
|
|
|
def regex_highlight_cjk(word: str) -> str:
|
|
"""Generate the regex pattern to match for a given word according
|
|
to whether or not the word contains CJK characters or not.
|
|
If the word is and/or contains CJK character, the regex pattern
|
|
will match standalone word by taking into account the presence
|
|
of whitespace before and after it; if not, it will match any presence
|
|
of the word throughout the text, ignoring the whitespace.
|
|
|
|
Args:
|
|
word (str): the word to be matched with regex pattern.
|
|
|
|
Returns:
|
|
str: the regex pattern for the word.
|
|
"""
|
|
rword = re.escape(word)
|
|
if contains_cjko(rword):
|
|
return fr'({rword})'
|
|
else:
|
|
return fr'\b({rword})(?!\w)'
|
|
|
|
|
|
def highlight_content(content, query):
|
|
|
|
if not content:
|
|
return None
|
|
|
|
# ignoring html contents
|
|
# TODO better html content detection
|
|
if content.find('<') != -1:
|
|
return content
|
|
|
|
querysplit = query.split()
|
|
queries = []
|
|
for qs in querysplit:
|
|
qs = qs.replace("'", "").replace('"', '').replace(" ", "")
|
|
if len(qs) > 0:
|
|
queries.extend(re.findall(regex_highlight_cjk(qs), content, flags=re.I | re.U))
|
|
if len(queries) > 0:
|
|
for q in set(queries):
|
|
content = re.sub(
|
|
regex_highlight_cjk(q), f'<span class="highlight">{q}</span>'.replace('\\', r'\\'), content
|
|
)
|
|
return content
|
|
|
|
|
|
def searxng_l10n_timespan(dt: datetime) -> str: # pylint: disable=invalid-name
|
|
"""Returns a human-readable and translated string indicating how long ago
|
|
a date was in the past / the time span of the date to the present.
|
|
|
|
On January 1st, midnight, the returned string only indicates how many years
|
|
ago the date was.
|
|
"""
|
|
# TODO, check if timezone is calculated right # pylint: disable=fixme
|
|
d = dt.date()
|
|
t = dt.time()
|
|
if d.month == 1 and d.day == 1 and t.hour == 0 and t.minute == 0 and t.second == 0:
|
|
return str(d.year)
|
|
if dt.replace(tzinfo=None) >= datetime.now() - timedelta(days=1):
|
|
timedifference = datetime.now() - dt.replace(tzinfo=None)
|
|
minutes = int((timedifference.seconds / 60) % 60)
|
|
hours = int(timedifference.seconds / 60 / 60)
|
|
if hours == 0:
|
|
return gettext('{minutes} minute(s) ago').format(minutes=minutes)
|
|
return gettext('{hours} hour(s), {minutes} minute(s) ago').format(hours=hours, minutes=minutes)
|
|
return format_date(dt)
|
|
|
|
|
|
def is_flask_run_cmdline():
|
|
"""Check if the application was started using "flask run" command line
|
|
|
|
Inspect the callstack.
|
|
See https://github.com/pallets/flask/blob/master/src/flask/__main__.py
|
|
|
|
Returns:
|
|
bool: True if the application was started using "flask run".
|
|
"""
|
|
frames = inspect.stack()
|
|
if len(frames) < 2:
|
|
return False
|
|
return frames[-2].filename.endswith('flask/cli.py')
|
|
|
|
|
|
DEFAULT_GROUP_NAME = 'others'
|
|
|
|
|
|
def group_engines_in_tab(engines: Iterable[Engine]) -> List[Tuple[str, Iterable[Engine]]]:
|
|
"""Groups an Iterable of engines by their first non tab category"""
|
|
|
|
def get_group(eng):
|
|
non_tab_categories = [
|
|
c for c in eng.categories if c not in list(settings['categories_as_tabs'].keys()) + [OTHER_CATEGORY]
|
|
]
|
|
return non_tab_categories[0] if len(non_tab_categories) > 0 else DEFAULT_GROUP_NAME
|
|
|
|
groups = itertools.groupby(sorted(engines, key=get_group), get_group)
|
|
|
|
def group_sort_key(group):
|
|
return (group[0] == DEFAULT_GROUP_NAME, group[0].lower())
|
|
|
|
sorted_groups = sorted(((name, list(engines)) for name, engines in groups), key=group_sort_key)
|
|
|
|
def engine_sort_key(engine):
|
|
return (engine.about.get('language', ''), engine.name)
|
|
|
|
return [(groupname, sorted(engines, key=engine_sort_key)) for groupname, engines in sorted_groups]
|