searxng/searx/data/__init__.py
2024-05-18 20:50:09 +00:00

169 lines
6.1 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""This module holds the *data* created by::
make data.all
"""
__all__ = [
'ENGINE_TRAITS',
'USER_AGENTS',
'EXTERNAL_URLS',
'WIKIDATA_UNITS',
'EXTERNAL_BANGS',
'LOCALES',
'ahmia_blacklist_loader',
'fetch_engine_descriptions',
'fetch_iso4217_from_user',
'fetch_name_from_iso4217',
'fetch_osm_key_label',
]
import re
import unicodedata
import json
import sqlite3
from contextlib import contextmanager
from typing import Dict, Generator, List, Optional
from functools import lru_cache
from pathlib import Path
data_dir = Path(__file__).parent
data_connection_local = {}
def _load(filename):
with open(data_dir / filename, encoding='utf-8') as f:
return json.load(f)
@contextmanager
def sql_connection(filename: str) -> Generator[sqlite3.Connection, None, None]:
"""Return a read only SQLite connection to filename.
The filename is relative to searx/data
Multiple calls to this function in the same thread,
already return the same connection.
"""
dict_id = filename
connection = data_connection_local.get(dict_id)
if connection is None:
data_filename = str(data_dir / filename)
# open database in read only mode and allow to share between threads
# https://www.sqlite.org/faq.html#q6
# see https://ricardoanderegg.com/posts/python-sqlite-thread-safety/
# and https://docs.python.org/3/library/sqlite3.html#sqlite3.threadsafety
# sqlite3.threadsafety is hard coded to 1
# the only reliable way to check if multithreading is supported is to run this query
# SELECT * FROM pragma_compile_options WHERE compile_options LIKE 'THREADSAFE=%'
# but THREADSAFE=1 on Linux anyway
data_connection = sqlite3.connect(f'file:{data_filename}?mode=ro', uri=True, check_same_thread=False)
# 512KB of cache instead of 2MB (512KB / 4KB = 128, 4KB is the default page size)
# https://www.sqlite.org/pragma.html#pragma_cache_size
data_connection.execute("PRAGMA cache_size = 128;")
data_connection_local[dict_id] = data_connection
yield data_connection
def fetch_engine_descriptions(language) -> Dict[str, List[str]]:
"""Return engine description and source for each engine name."""
with sql_connection("engine_descriptions.db") as conn:
res = conn.execute("SELECT engine, description, source FROM engine_descriptions WHERE language=?", (language,))
return {result[0]: [result[1], result[2]] for result in res.fetchall()}
def _normalize_name(name):
name = name.lower().replace('-', ' ').rstrip('s')
name = re.sub(' +', ' ', name)
return unicodedata.normalize('NFKD', name).lower()
@lru_cache(10)
def fetch_iso4217_from_user(name: str) -> Optional[str]:
with sql_connection("currencies.db") as connection:
# try the iso4217
res = connection.execute("SELECT iso4217 FROM currencies WHERE lower(iso4217)=? LIMIT 1", (name.lower(),))
result = res.fetchone()
if result:
return result[0]
# try the currency names
name = _normalize_name(name)
res = connection.execute("SELECT iso4217 FROM currencies WHERE name=?", (name,))
result = list(set(result[0] for result in res.fetchall()))
if len(result) == 1:
return result[0]
# ambiguity --> return nothing
return None
@lru_cache(10)
def fetch_name_from_iso4217(iso4217: str, language: str) -> Optional[str]:
with sql_connection("currencies.db") as connection:
res = connection.execute("SELECT name FROM currencies WHERE iso4217=? AND language=?", (iso4217, language))
result = [result[0] for result in res.fetchall()]
if len(result) == 1:
return result[0]
return None
@lru_cache(100)
def fetch_osm_key_label(key_name: str, language: str) -> Optional[str]:
if key_name.startswith('currency:'):
# currency:EUR --> get the name from the CURRENCIES variable
# see https://wiki.openstreetmap.org/wiki/Key%3Acurrency
# and for example https://taginfo.openstreetmap.org/keys/currency:EUR#values
# but there is also currency=EUR (currently not handled)
# https://taginfo.openstreetmap.org/keys/currency#values
currency = key_name.split(':')
if len(currency) > 1:
label = fetch_name_from_iso4217(currency[1], language)
if label:
return label
return currency[1]
language = language.lower()
language_short = language.split('-')[0]
with sql_connection("osm_keys_tags.db") as conn:
res = conn.execute(
"SELECT language, label FROM osm_keys WHERE name=? AND language in (?, ?, 'en')",
(key_name, language, language_short),
)
result = {result[0]: result[1] for result in res.fetchall()}
return result.get(language) or result.get(language_short) or result.get('en')
@lru_cache(100)
def fetch_osm_tag_label(tag_key: str, tag_value: str, language: str) -> Optional[str]:
language = language.lower()
language_short = language.split('-')[0]
with sql_connection("osm_keys_tags.db") as conn:
res = conn.execute(
"SELECT language, label FROM osm_tags WHERE tag_key=? AND tag_value=? AND language in (?, ?, 'en')",
(tag_key, tag_value, language, language_short),
)
result = {result[0]: result[1] for result in res.fetchall()}
return result.get(language) or result.get(language_short) or result.get('en')
def ahmia_blacklist_loader():
"""Load data from `ahmia_blacklist.txt` and return a list of MD5 values of onion
names. The MD5 values are fetched by::
searxng_extra/update/update_ahmia_blacklist.py
This function is used by :py:mod:`searx.plugins.ahmia_filter`.
"""
with open(data_dir / 'ahmia_blacklist.txt', encoding='utf-8') as f:
return f.read().split()
USER_AGENTS = _load('useragents.json')
EXTERNAL_URLS = _load('external_urls.json')
WIKIDATA_UNITS = _load('wikidata_units.json')
EXTERNAL_BANGS = _load('external_bangs.json')
ENGINE_TRAITS = _load('engine_traits.json')
LOCALES = _load('locales.json')