[mod] add utils/fetch_external_bangs.py

Based on duckduckgo bangs
Store bangs on a trie to allow autocomplete (not in this commit)
This commit is contained in:
Alexandre Flament 2021-02-22 18:03:24 +01:00
parent 606aa79e49
commit 7c1847d5f2
7 changed files with 19432 additions and 67946 deletions

View File

@ -194,7 +194,8 @@ PYLINT_FILES=\
searx/engines/google_news.py \
searx/engines/google_videos.py \
searx/engines/google_images.py \
searx/engines/mediathekviewweb.py
searx/engines/mediathekviewweb.py \
utils/fetch_external_bangs.py
test.pylint: pyenvinstall
$(call cmd,pylint,$(PYLINT_FILES))

View File

@ -2,7 +2,7 @@ import json
from pathlib import Path
__init__ = ['ENGINES_LANGUGAGES', 'CURRENCIES', 'USER_AGENTS', 'EXTERNAL_URLS', 'WIKIDATA_UNITS',
__init__ = ['ENGINES_LANGUGAGES', 'CURRENCIES', 'USER_AGENTS', 'EXTERNAL_URLS', 'WIKIDATA_UNITS', 'EXTERNAL_BANGS',
'bangs_loader', 'ahmia_blacklist_loader']
data_dir = Path(__file__).parent
@ -12,10 +12,6 @@ def load(filename):
return json.load(fd)
def bangs_loader():
return load('bangs.json')
def ahmia_blacklist_loader():
with open(str(data_dir / 'ahmia_blacklist.txt'), encoding='utf-8') as fd:
return fd.read().split()
@ -26,3 +22,4 @@ CURRENCIES = load('currencies.json')
USER_AGENTS = load('useragents.json')
EXTERNAL_URLS = load('external_urls.json')
WIKIDATA_UNITS = load('wikidata_units.json')
EXTERNAL_BANGS = load('external_bangs.json')

File diff suppressed because it is too large Load Diff

19067
searx/data/external_bangs.json Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,39 +1,89 @@
from searx.data import bangs_loader
# SPDX-License-Identifier: AGPL-3.0-or-later
# bangs data coming from the following url convert to json with
# https://raw.githubusercontent.com/jivesearch/jivesearch/master/bangs/bangs.toml
# https://pseitz.github.io/toml-to-json-online-converter/
# NOTE only use the get_bang_url
bangs_data = {}
for bang in bangs_loader()['bang']:
for trigger in bang["triggers"]:
bangs_data[trigger] = {x: y for x, y in bang.items() if x != "triggers"}
from searx.data import EXTERNAL_BANGS
def get_bang_url(search_query):
def get_node(external_bangs_db, bang):
node = external_bangs_db['trie']
after = ''
before = ''
for bang_letter in bang:
after += bang_letter
if after in node and isinstance(node, dict):
node = node[after]
before += after
after = ''
return node, before, after
def get_bang_definition_and_ac(external_bangs_db, bang):
node, before, after = get_node(external_bangs_db, bang)
bang_definition = None
bang_ac_list = []
if after != '':
for k in node:
if k.startswith(after):
bang_ac_list.append(before + k)
elif isinstance(node, dict):
bang_definition = node.get('*')
bang_ac_list = [before + k for k in node.keys() if k != '*']
elif isinstance(node, str):
bang_definition = node
bang_ac_list = []
return bang_definition, bang_ac_list
def resolve_bang_definition(bang_definition, query):
url, rank = bang_definition.split(chr(1))
url = url.replace(chr(2), query)
if url.startswith('//'):
url = 'https:' + url
rank = int(rank) if len(rank) > 0 else 0
return (url, rank)
def get_bang_definition_and_autocomplete(bang, external_bangs_db=None):
global EXTERNAL_BANGS
if external_bangs_db is None:
external_bangs_db = EXTERNAL_BANGS
bang_definition, bang_ac_list = get_bang_definition_and_ac(external_bangs_db, bang)
new_autocomplete = []
current = [*bang_ac_list]
done = set()
while len(current) > 0:
bang_ac = current.pop(0)
done.add(bang_ac)
current_bang_definition, current_bang_ac_list = get_bang_definition_and_ac(external_bangs_db, bang_ac)
if current_bang_definition:
_, order = resolve_bang_definition(current_bang_definition, '')
new_autocomplete.append((bang_ac, order))
for new_bang in current_bang_ac_list:
if new_bang not in done and new_bang not in current:
current.append(new_bang)
new_autocomplete.sort(key=lambda t: (-t[1], t[0]))
new_autocomplete = list(map(lambda t: t[0], new_autocomplete))
return bang_definition, new_autocomplete
def get_bang_url(search_query, external_bangs_db=None):
"""
Redirects if the user supplied a correct bang search.
:param search_query: This is a search_query object which contains preferences and the submitted queries.
:return: None if the bang was invalid, else a string of the redirect url.
"""
global EXTERNAL_BANGS
if external_bangs_db is None:
external_bangs_db = EXTERNAL_BANGS
if search_query.external_bang:
query = search_query.query
bang = _get_bang(search_query.external_bang)
bang_definition, _ = get_bang_definition_and_ac(external_bangs_db, search_query.external_bang)
return resolve_bang_definition(bang_definition, search_query.query)[0] if bang_definition else None
if bang and query:
# TODO add region support.
bang_url = bang["regions"]["default"]
return bang_url.replace("{{{term}}}", query)
return None
def _get_bang(user_bang):
"""
Searches if the supplied user bang is available. Returns None if not found.
:param user_bang: The parsed user bang. For example yt
:return: Returns a dict with bangs data (check bangs_data.json for the structure)
"""
return bangs_data.get(user_bang)

View File

@ -0,0 +1,123 @@
from searx.external_bang import get_node, resolve_bang_definition, get_bang_url, get_bang_definition_and_autocomplete
from searx.search import SearchQuery, EngineRef
from searx.testing import SearxTestCase
TEST_DB = {
'trie': {
'exam': {
'ple': '//example.com/' + chr(2) + chr(1) + '0',
'*': '//wikipedia.org/wiki/' + chr(2) + chr(1) + '0',
},
'sea': {
'*': 'sea' + chr(2) + chr(1) + '0',
'rch': {
'*': 'search' + chr(2) + chr(1) + '0',
'ing': 'searching' + chr(2) + chr(1) + '0',
},
's': {
'on': 'season' + chr(2) + chr(1) + '0',
'capes': 'seascape' + chr(2) + chr(1) + '0',
}
},
'error': ['error in external_bangs.json']
}
}
class TestGetNode(SearxTestCase):
DB = {
'trie': {
'exam': {
'ple': 'test',
'*': 'not used',
}
}
}
def test_found(self):
node, before, after = get_node(TestGetNode.DB, 'example')
self.assertEqual(node, 'test')
self.assertEqual(before, 'example')
self.assertEqual(after, '')
def test_get_partial(self):
node, before, after = get_node(TestGetNode.DB, 'examp')
self.assertEqual(node, TestGetNode.DB['trie']['exam'])
self.assertEqual(before, 'exam')
self.assertEqual(after, 'p')
def test_not_found(self):
node, before, after = get_node(TestGetNode.DB, 'examples')
self.assertEqual(node, 'test')
self.assertEqual(before, 'example')
self.assertEqual(after, 's')
class TestResolveBangDefinition(SearxTestCase):
def test_https(self):
url, rank = resolve_bang_definition('//example.com/' + chr(2) + chr(1) + '42', 'query')
self.assertEqual(url, 'https://example.com/query')
self.assertEqual(rank, 42)
def test_http(self):
url, rank = resolve_bang_definition('http://example.com/' + chr(2) + chr(1) + '0', 'text')
self.assertEqual(url, 'http://example.com/text')
self.assertEqual(rank, 0)
class TestGetBangDefinitionAndAutocomplete(SearxTestCase):
def test_found(self):
global TEST_DB
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('exam', external_bangs_db=TEST_DB)
self.assertEqual(bang_definition, TEST_DB['trie']['exam']['*'])
self.assertEqual(new_autocomplete, ['example'])
def test_found_optimized(self):
global TEST_DB
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('example', external_bangs_db=TEST_DB)
self.assertEqual(bang_definition, TEST_DB['trie']['exam']['ple'])
self.assertEqual(new_autocomplete, [])
def test_partial(self):
global TEST_DB
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('examp', external_bangs_db=TEST_DB)
self.assertEqual(bang_definition, None)
self.assertEqual(new_autocomplete, ['example'])
def test_partial2(self):
global TEST_DB
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('sea', external_bangs_db=TEST_DB)
self.assertEqual(bang_definition, TEST_DB['trie']['sea']['*'])
self.assertEqual(new_autocomplete, ['search', 'searching', 'seascapes', 'season'])
def test_error(self):
global TEST_DB
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('error', external_bangs_db=TEST_DB)
self.assertEqual(bang_definition, None)
self.assertEqual(new_autocomplete, [])
def test_actual_data(self):
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('duckduckgo')
self.assertTrue(bang_definition.startswith('//duckduckgo.com/?q='))
self.assertEqual(new_autocomplete, [])
class TestExternalBangJson(SearxTestCase):
def test_no_external_bang_query(self):
result = get_bang_url(SearchQuery('test', engineref_list=[EngineRef('wikipedia', 'general')]))
self.assertEqual(result, None)
def test_get_bang_url(self):
global TEST_DB
url = get_bang_url(SearchQuery('test', engineref_list=[], external_bang='example'), external_bangs_db=TEST_DB)
self.assertEqual(url, 'https://example.com/test')
def test_actual_data(self):
google_url = get_bang_url(SearchQuery('test', engineref_list=[], external_bang='g'))
self.assertEqual(google_url, 'https://www.google.com/search?q=test')

161
utils/fetch_external_bangs.py Executable file
View File

@ -0,0 +1,161 @@
#!/usr/bin/env python
"""
Update searx/data/external_bangs.json using the duckduckgo bangs.
https://duckduckgo.com/newbang loads
* a javascript which provides the bang version ( https://duckduckgo.com/bv1.js )
* a JSON file which contains the bangs ( https://duckduckgo.com/bang.v260.js for example )
This script loads the javascript, then the bangs.
The javascript URL may change in the future ( for example https://duckduckgo.com/bv2.js ),
but most probably it will requires to update RE_BANG_VERSION
"""
# pylint: disable=C0116
import sys
import json
import re
from os.path import realpath, dirname, join
import requests
# set path
sys.path.append(realpath(dirname(realpath(__file__)) + '/../'))
from searx import searx_dir # pylint: disable=E0401 C0413
# from https://duckduckgo.com/newbang
URL_BV1 = 'https://duckduckgo.com/bv1.js'
RE_BANG_VERSION = re.compile(r'\/bang\.v([0-9]+)\.js')
HTTPS_COLON = 'https:'
HTTP_COLON = 'http:'
def get_bang_url():
response = requests.get(URL_BV1)
response.raise_for_status()
r = RE_BANG_VERSION.findall(response.text)
return f'https://duckduckgo.com/bang.v{r[0]}.js', r[0]
def fetch_ddg_bangs(url):
response = requests.get(url)
response.raise_for_status()
return json.loads(response.content.decode())
def merge_when_no_leaf(node):
"""Minimize the number of nodes
A -> B -> C
B is child of A
C is child of B
If there are no C equals to '*', then each C are merged into A
For example:
d -> d -> g -> * (ddg*)
-> i -> g -> * (dig*)
becomes
d -> dg -> *
-> ig -> *
"""
restart = False
if not isinstance(node, dict):
return
# create a copy of the keys so node can be modified
keys = list(node.keys())
for key in keys:
if key == '*':
continue
value = node[key]
value_keys = list(value.keys())
if '*' not in value_keys:
for value_key in value_keys:
node[key + value_key] = value[value_key]
merge_when_no_leaf(node[key + value_key])
del node[key]
restart = True
else:
merge_when_no_leaf(value)
if restart:
merge_when_no_leaf(node)
def optimize_leaf(parent, parent_key, node):
if not isinstance(node, dict):
return
if len(node) == 1 and '*' in node and parent is not None:
parent[parent_key] = node['*']
else:
for key, value in node.items():
optimize_leaf(node, key, value)
def parse_ddg_bangs(ddg_bangs):
bang_trie = {}
bang_urls = {}
for bang_definition in ddg_bangs:
# bang_list
bang_url = bang_definition['u']
if '{{{s}}}' not in bang_url:
# ignore invalid bang
continue
bang_url = bang_url.replace('{{{s}}}', chr(2))
# only for the https protocol: "https://example.com" becomes "//example.com"
if bang_url.startswith(HTTPS_COLON + '//'):
bang_url = bang_url[len(HTTPS_COLON):]
#
if bang_url.startswith(HTTP_COLON + '//') and bang_url[len(HTTP_COLON):] in bang_urls:
# if the bang_url uses the http:// protocol, and the same URL exists in https://
# then reuse the https:// bang definition. (written //example.com)
bang_def_output = bang_urls[bang_url[len(HTTP_COLON):]]
else:
# normal use case : new http:// URL or https:// URL (without "https:", see above)
bang_rank = str(bang_definition['r'])
bang_def_output = bang_url + chr(1) + bang_rank
bang_def_output = bang_urls.setdefault(bang_url, bang_def_output)
bang_urls[bang_url] = bang_def_output
# bang name
bang = bang_definition['t']
# bang_trie
t = bang_trie
for bang_letter in bang:
t = t.setdefault(bang_letter, {})
t = t.setdefault('*', bang_def_output)
# optimize the trie
merge_when_no_leaf(bang_trie)
optimize_leaf(None, None, bang_trie)
return bang_trie
def get_bangs_filename():
return join(join(searx_dir, "data"), "external_bangs.json")
if __name__ == '__main__':
bangs_url, bangs_version = get_bang_url()
print(f'fetch bangs from {bangs_url}')
output = {
'version': bangs_version,
'trie': parse_ddg_bangs(fetch_ddg_bangs(bangs_url))
}
with open(get_bangs_filename(), 'w') as fp:
json.dump(output, fp, ensure_ascii=False, indent=4)