Merge branch 'searxng:master' into elasticsearch-custom-query

This commit is contained in:
frob 2024-11-29 02:32:55 +01:00 committed by GitHub
commit 82d1544a6b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
341 changed files with 29669 additions and 12534 deletions

View file

@ -0,0 +1,229 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""`Adobe Stock`_ is a service that gives access to millions of royalty-free
assets. Assets types include photos, vectors, illustrations, templates, 3D
assets, videos, motion graphics templates and audio tracks.
.. Adobe Stock: https://stock.adobe.com/
Configuration
=============
The engine has the following mandatory setting:
- SearXNG's :ref:`engine categories`
- Adobe-Stock's :py:obj:`adobe_order`
- Adobe-Stock's :py:obj:`adobe_content_types`
.. code:: yaml
- name: adobe stock
engine: adobe_stock
shortcut: asi
categories: [images]
adobe_order: relevance
adobe_content_types: ["photo", "illustration", "zip_vector", "template", "3d", "image"]
- name: adobe stock video
engine: adobe_stock
network: adobe stock
shortcut: asi
categories: [videos]
adobe_order: relevance
adobe_content_types: ["video"]
Implementation
==============
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from datetime import datetime, timedelta
from urllib.parse import urlencode
import isodate
if TYPE_CHECKING:
import logging
logger: logging.Logger
about = {
"website": "https://stock.adobe.com/",
"wikidata_id": "Q5977430",
"official_api_documentation": None,
"use_official_api": False,
"require_api_key": False,
"results": "JSON",
}
categories = []
paging = True
send_accept_language_header = True
results_per_page = 10
base_url = "https://stock.adobe.com"
adobe_order: str = ""
"""Sort order, can be one of:
- ``relevance`` or
- ``featured`` or
- ``creation`` (most recent) or
- ``nb_downloads`` (number of downloads)
"""
ADOBE_VALID_TYPES = ["photo", "illustration", "zip_vector", "video", "template", "3d", "audio", "image"]
adobe_content_types: list = []
"""A list of of content types. The following content types are offered:
- Images: ``image``
- Videos: ``video``
- Templates: ``template``
- 3D: ``3d``
- Audio ``audio``
Additional subcategories:
- Photos: ``photo``
- Illustrations: ``illustration``
- Vectors: ``zip_vector`` (Vectors),
"""
# Do we need support for "free_collection" and "include_stock_enterprise"?
def init(_):
if not categories:
raise ValueError("adobe_stock engine: categories is unset")
# adobe_order
if not adobe_order:
raise ValueError("adobe_stock engine: adobe_order is unset")
if adobe_order not in ["relevance", "featured", "creation", "nb_downloads"]:
raise ValueError(f"unsupported adobe_order: {adobe_order}")
# adobe_content_types
if not adobe_content_types:
raise ValueError("adobe_stock engine: adobe_content_types is unset")
if isinstance(adobe_content_types, list):
for t in adobe_content_types:
if t not in ADOBE_VALID_TYPES:
raise ValueError("adobe_stock engine: adobe_content_types: '%s' is invalid" % t)
else:
raise ValueError(
"adobe_stock engine: adobe_content_types must be a list of strings not %s" % type(adobe_content_types)
)
def request(query, params):
args = {
"k": query,
"limit": results_per_page,
"order": adobe_order,
"search_page": params["pageno"],
"search_type": "pagination",
}
for content_type in ADOBE_VALID_TYPES:
args[f"filters[content_type:{content_type}]"] = 1 if content_type in adobe_content_types else 0
params["url"] = f"{base_url}/de/Ajax/Search?{urlencode(args)}"
# headers required to bypass bot-detection
if params["searxng_locale"] == "all":
params["headers"]["Accept-Language"] = "en-US,en;q=0.5"
return params
def parse_image_item(item):
return {
"template": "images.html",
"url": item["content_url"],
"title": item["title"],
"content": item["asset_type"],
"img_src": item["content_thumb_extra_large_url"],
"thumbnail_src": item["thumbnail_url"],
"resolution": f"{item['content_original_width']}x{item['content_original_height']}",
"img_format": item["format"],
"author": item["author"],
}
def parse_video_item(item):
# in video items, the title is more or less a "content description", we try
# to reduce the lenght of the title ..
title = item["title"]
content = ""
if "." in title.strip()[:-1]:
content = title
title = title.split(".", 1)[0]
elif "," in title:
content = title
title = title.split(",", 1)[0]
elif len(title) > 50:
content = title
title = ""
for w in content.split(" "):
title += f" {w}"
if len(title) > 50:
title = title.strip() + "\u2026"
break
return {
"template": "videos.html",
"url": item["content_url"],
"title": title,
"content": content,
# https://en.wikipedia.org/wiki/ISO_8601#Durations
"length": isodate.parse_duration(item["time_duration"]),
"publishedDate": datetime.strptime(item["creation_date"], "%Y-%m-%d"),
"thumbnail": item["thumbnail_url"],
"iframe_src": item["video_small_preview_url"],
"metadata": item["asset_type"],
}
def parse_audio_item(item):
audio_data = item["audio_data"]
content = audio_data.get("description") or ""
if audio_data.get("album"):
content = audio_data["album"] + " - " + content
return {
"url": item["content_url"],
"title": item["title"],
"content": content,
# "thumbnail": base_url + item["thumbnail_url"],
"iframe_src": audio_data["preview"]["url"],
"publishedDate": datetime.fromisoformat(audio_data["release_date"]) if audio_data["release_date"] else None,
"length": timedelta(seconds=round(audio_data["duration"] / 1000)) if audio_data["duration"] else None,
"author": item.get("artist_name"),
}
def response(resp):
results = []
json_resp = resp.json()
if isinstance(json_resp["items"], list):
return None
for item in json_resp["items"].values():
if item["asset_type"].lower() in ["image", "premium-image", "illustration", "vector"]:
result = parse_image_item(item)
elif item["asset_type"].lower() == "video":
result = parse_video_item(item)
elif item["asset_type"].lower() == "audio":
result = parse_audio_item(item)
else:
logger.error("no handle for %s --> %s", item["asset_type"], item)
continue
results.append(result)
return results

View file

@ -0,0 +1,83 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""`Alpine Linux binary packages`_. `Alpine Linux`_ is a Linux-based operation
system designed to be small, simple and secure. Contrary to many other Linux
distributions, it uses musl, BusyBox and OpenRC. Alpine is mostly used on
servers and for Docker images.
.. _Alpine Linux binary packages: https://pkgs.alpinelinux.org
.. _Alpine Linux: https://www.alpinelinux.org
"""
import re
from urllib.parse import urlencode
from lxml import html
from dateutil import parser
from searx.utils import eval_xpath, eval_xpath_list, extract_text
about = {
'website': 'https://www.alpinelinux.org',
'wikidata_id': 'Q4033826',
'use_official_api': False,
'official_api_documentation': None,
'require_api_key': False,
'results': 'HTML',
}
paging = True
categories = ['packages', 'it']
base_url = "https://pkgs.alpinelinux.org"
alpine_arch = 'x86_64'
"""Kernel architecture: ``x86_64``, ``x86``, ``aarch64``, ``armhf``,
``ppc64le``, ``s390x``, ``armv7`` or ``riscv64``"""
ARCH_RE = re.compile("x86_64|x86|aarch64|armhf|ppc64le|s390x|armv7|riscv64")
"""Regular expression to match supported architectures in the query string."""
def request(query, params):
query_arch = ARCH_RE.search(query)
if query_arch:
query_arch = query_arch.group(0)
query = query.replace(query_arch, '').strip()
args = {
# use wildcards to match more than just packages with the exact same
# name as the query
'name': f"*{query}*",
'page': params['pageno'],
'arch': query_arch or alpine_arch,
}
params['url'] = f"{base_url}/packages?{urlencode(args)}"
return params
def response(resp):
results = []
doc = html.fromstring(resp.text)
for result in eval_xpath_list(doc, "//table/tbody/tr"):
if len(result.xpath("./td")) < 9:
# skip non valid entries in the result table
# e.g the "No item found..." message
continue
results.append(
{
'template': 'packages.html',
'url': base_url + extract_text(eval_xpath(result, './td[contains(@class, "package")]/a/@href')),
'title': extract_text(eval_xpath(result, './td[contains(@class, "package")]')),
'package_name': extract_text(eval_xpath(result, './td[contains(@class, "package")]')),
'publishedDate': parser.parse(extract_text(eval_xpath(result, './td[contains(@class, "bdate")]'))),
'version': extract_text(eval_xpath(result, './td[contains(@class, "version")]')),
'homepage': extract_text(eval_xpath(result, './td[contains(@class, "url")]/a/@href')),
'maintainer': extract_text(eval_xpath(result, './td[contains(@class, "maintainer")]')),
'license_name': extract_text(eval_xpath(result, './td[contains(@class, "license")]')),
'tags': [extract_text(eval_xpath(result, './td[contains(@class, "repo")]'))],
}
)
return results

View file

@ -34,10 +34,10 @@ Implementations
"""
from typing import List, Dict, Any, Optional
from urllib.parse import quote
from urllib.parse import urlencode
from lxml import html
from searx.utils import extract_text, eval_xpath, eval_xpath_list
from searx.utils import extract_text, eval_xpath, eval_xpath_getindex, eval_xpath_list
from searx.enginelib.traits import EngineTraits
from searx.data import ENGINE_TRAITS
@ -53,7 +53,7 @@ about: Dict[str, Any] = {
# engine dependent config
categories: List[str] = ["files"]
paging: bool = False
paging: bool = True
# search-url
base_url: str = "https://annas-archive.org"
@ -99,9 +99,18 @@ def init(engine_settings=None): # pylint: disable=unused-argument
def request(query, params: Dict[str, Any]) -> Dict[str, Any]:
q = quote(query)
lang = traits.get_language(params["language"], traits.all_locale) # type: ignore
params["url"] = base_url + f"/search?lang={lang or ''}&content={aa_content}&ext={aa_ext}&sort={aa_sort}&q={q}"
args = {
'lang': lang,
'content': aa_content,
'ext': aa_ext,
'sort': aa_sort,
'q': query,
'page': params['pageno'],
}
# filter out None and empty values
filtered_args = dict((k, v) for k, v in args.items() if v)
params["url"] = f"{base_url}/search?{urlencode(filtered_args)}"
return params
@ -128,12 +137,12 @@ def response(resp) -> List[Dict[str, Optional[str]]]:
def _get_result(item):
return {
'template': 'paper.html',
'url': base_url + item.xpath('./@href')[0],
'url': base_url + extract_text(eval_xpath_getindex(item, './@href', 0)),
'title': extract_text(eval_xpath(item, './/h3/text()[1]')),
'publisher': extract_text(eval_xpath(item, './/div[contains(@class, "text-sm")]')),
'authors': [extract_text(eval_xpath(item, './/div[contains(@class, "italic")]'))],
'content': extract_text(eval_xpath(item, './/div[contains(@class, "text-xs")]')),
'thumbnail': item.xpath('.//img/@src')[0],
'thumbnail': extract_text(eval_xpath_getindex(item, './/img/@src', 0, default=None), allow_none=True),
}
@ -184,3 +193,8 @@ def fetch_traits(engine_traits: EngineTraits):
for x in eval_xpath_list(dom, "//form//select[@name='sort']//option"):
engine_traits.custom['sort'].append(x.get("value"))
# for better diff; sort the persistence of these traits
engine_traits.custom['content'].sort()
engine_traits.custom['ext'].sort()
engine_traits.custom['sort'].sort()

View file

@ -31,7 +31,7 @@ paging = True
number_of_results = 10
# shortcuts for advanced search
shorcut_dict = {
shortcut_dict = {
# user-friendly keywords
'format:': 'dcformat:',
'author:': 'dccreator:',
@ -55,7 +55,7 @@ shorcut_dict = {
def request(query, params):
# replace shortcuts with API advanced search keywords
for key, val in shorcut_dict.items():
for key, val in shortcut_dict.items():
query = re.sub(key, val, query)
# basic search

View file

@ -9,6 +9,8 @@ import string
from urllib.parse import urlencode
from datetime import datetime, timedelta
from searx import utils
# Engine metadata
about = {
"website": "https://www.bilibili.com",
@ -56,6 +58,8 @@ def request(query, params):
# Format the video duration
def format_duration(duration):
if not ":" in duration:
return None
minutes, seconds = map(int, duration.split(":"))
total_seconds = minutes * 60 + seconds
@ -70,7 +74,7 @@ def response(resp):
results = []
for item in search_res.get("data", {}).get("result", []):
title = item["title"]
title = utils.html_to_text(item["title"])
url = item["arcurl"]
thumbnail = item["pic"]
description = item["description"]

View file

@ -10,7 +10,7 @@ On the `preference page`_ Bing offers a lot of languages an regions (see section
LANGUAGE and COUNTRY/REGION). The Language is the language of the UI, we need
in SearXNG to get the translations of data such as *"published last week"*.
There is a description of the offical search-APIs_, unfortunately this is not
There is a description of the official search-APIs_, unfortunately this is not
the API we can use or that bing itself would use. You can look up some things
in the API to get a better picture of bing, but the value specifications like
the market codes are usually outdated or at least no longer used by bing itself.
@ -91,7 +91,7 @@ def request(query, params):
page = params.get('pageno', 1)
query_params = {
'q': query,
# if arg 'pq' is missed, somtimes on page 4 we get results from page 1,
# if arg 'pq' is missed, sometimes on page 4 we get results from page 1,
# don't ask why it is only sometimes / its M$ and they have never been
# deterministic ;)
'pq': query,
@ -177,7 +177,7 @@ def response(resp):
logger.debug('result error :\n%s', e)
if result_len and _page_offset(resp.search_params.get("pageno", 0)) > result_len:
# Avoid reading more results than avalaible.
# Avoid reading more results than available.
# For example, if there is 100 results from some search and we try to get results from 120 to 130,
# Bing will send back the results from 0 to 10 and no error.
# If we compare results count with the first parameter of the request we can avoid this "invalid" results.

View file

@ -99,7 +99,7 @@ def response(resp):
'url': metadata['purl'],
'thumbnail_src': metadata['turl'],
'img_src': metadata['murl'],
'content': metadata['desc'],
'content': metadata.get('desc'),
'title': title,
'source': source,
'resolution': img_format[0],

View file

@ -123,7 +123,9 @@ def response(resp):
thumbnail = None
imagelink = eval_xpath_getindex(newsitem, './/a[@class="imagelink"]//img', 0, None)
if imagelink is not None:
thumbnail = 'https://www.bing.com/' + imagelink.attrib.get('src')
thumbnail = imagelink.attrib.get('src')
if not thumbnail.startswith("https://www.bing.com"):
thumbnail = 'https://www.bing.com/' + thumbnail
results.append(
{

View file

@ -123,7 +123,6 @@ from typing import Any, TYPE_CHECKING
from urllib.parse import (
urlencode,
urlparse,
parse_qs,
)
from dateutil import parser
@ -137,6 +136,7 @@ from searx.utils import (
eval_xpath_list,
eval_xpath_getindex,
js_variable_to_python,
get_embeded_stream_url,
)
from searx.enginelib.traits import EngineTraits
@ -311,7 +311,7 @@ def _parse_search(resp):
# In my tests a video tag in the WEB search was most often not a
# video, except the ones from youtube ..
iframe_src = _get_iframe_src(url)
iframe_src = get_embeded_stream_url(url)
if iframe_src:
item['iframe_src'] = iframe_src
item['template'] = 'videos.html'
@ -328,15 +328,6 @@ def _parse_search(resp):
return result_list
def _get_iframe_src(url):
parsed_url = urlparse(url)
if parsed_url.path == '/watch' and parsed_url.query:
video_id = parse_qs(parsed_url.query).get('v', []) # type: ignore
if video_id:
return 'https://www.youtube-nocookie.com/embed/' + video_id[0] # type: ignore
return None
def _parse_news(json_resp):
result_list = []
@ -392,7 +383,7 @@ def _parse_videos(json_resp):
if result['thumbnail'] is not None:
item['thumbnail'] = result['thumbnail']['src']
iframe_src = _get_iframe_src(url)
iframe_src = get_embeded_stream_url(url)
if iframe_src:
item['iframe_src'] = iframe_src
@ -426,14 +417,15 @@ def fetch_traits(engine_traits: EngineTraits):
print("ERROR: response from Brave is not OK.")
dom = html.fromstring(resp.text) # type: ignore
for option in dom.xpath('//div[@id="language-select"]//option'):
for option in dom.xpath('//section//option[@value="en-us"]/../option'):
ui_lang = option.get('value')
try:
if '-' in ui_lang:
l = babel.Locale.parse(ui_lang, sep='-')
if l.territory:
sxng_tag = region_tag(babel.Locale.parse(ui_lang, sep='-'))
else:
sxng_tag = language_tag(babel.Locale.parse(ui_lang))
sxng_tag = language_tag(babel.Locale.parse(ui_lang, sep='-'))
except babel.UnknownLocaleError:
print("ERROR: can't determine babel locale of Brave's (UI) language %s" % ui_lang)
@ -453,7 +445,7 @@ def fetch_traits(engine_traits: EngineTraits):
if not resp.ok: # type: ignore
print("ERROR: response from Brave is not OK.")
country_js = resp.text[resp.text.index("options:{all") + len('options:') :]
country_js = resp.text[resp.text.index("options:{all") + len('options:') :] # type: ignore
country_js = country_js[: country_js.index("},k={default")]
country_tags = js_variable_to_python(country_js)

View file

@ -54,7 +54,6 @@ def response(resp):
excerpt = result.xpath('.//div[@class="torrent_excerpt"]')[0]
content = html.tostring(excerpt, encoding='unicode', method='text', with_tail=False)
# it is better to emit <br/> instead of |, but html tags are verboten
content = content.strip().replace('\n', ' | ')
content = ' '.join(content.split())

View file

@ -0,0 +1,68 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Cloudflare AI engine"""
from json import loads, dumps
from searx.exceptions import SearxEngineAPIException
about = {
"website": 'https://ai.cloudflare.com',
"wikidata_id": None,
"official_api_documentation": 'https://developers.cloudflare.com/workers-ai',
"use_official_api": True,
"require_api_key": True,
"results": 'JSON',
}
cf_account_id = ''
cf_ai_api = ''
cf_ai_gateway = ''
cf_ai_model = ''
cf_ai_model_display_name = 'Cloudflare AI'
# Assistant messages hint to the AI about the desired output format. Not all models support this role.
cf_ai_model_assistant = 'Keep your answers as short and effective as possible.'
# System messages define the AI's personality. You can use them to set rules and how you expect the AI to behave.
cf_ai_model_system = 'You are a self-aware language model who is honest and direct about any question from the user.'
def request(query, params):
params['query'] = query
params['url'] = f'https://gateway.ai.cloudflare.com/v1/{cf_account_id}/{cf_ai_gateway}/workers-ai/{cf_ai_model}'
params['method'] = 'POST'
params['headers']['Authorization'] = f'Bearer {cf_ai_api}'
params['headers']['Content-Type'] = 'application/json'
params['data'] = dumps(
{
'messages': [
{'role': 'assistant', 'content': cf_ai_model_assistant},
{'role': 'system', 'content': cf_ai_model_system},
{'role': 'user', 'content': params['query']},
]
}
).encode('utf-8')
return params
def response(resp):
results = []
json = loads(resp.text)
if 'error' in json:
raise SearxEngineAPIException('Cloudflare AI error: ' + json['error'])
if 'result' in json:
results.append(
{
'content': json['result']['response'],
'infobox': cf_ai_model_display_name,
}
)
return results

View file

@ -10,6 +10,8 @@ engine offers some additional settings:
- :py:obj:`api_order`
- :py:obj:`search_endpoint`
- :py:obj:`show_avatar`
- :py:obj:`api_key`
- :py:obj:`api_username`
Example
=======
@ -27,6 +29,20 @@ for the ``paddling.com`` forum:
categories: ['social media', 'sports']
show_avatar: true
If the forum is private, you need to add an API key and username for the search:
.. code:: yaml
- name: paddling
engine: discourse
shortcut: paddle
base_url: 'https://forums.paddling.com/'
api_order: views
categories: ['social media', 'sports']
show_avatar: true
api_key: '<KEY>'
api_username: 'system'
Implementations
===============
@ -65,6 +81,12 @@ api_order = 'likes'
show_avatar = False
"""Show avatar of the user who send the post."""
api_key = ''
"""API key of the Discourse forum."""
api_username = ''
"""API username of the Discourse forum."""
paging = True
time_range_support = True
@ -98,6 +120,12 @@ def request(query, params):
'X-Requested-With': 'XMLHttpRequest',
}
if api_key != '':
params['headers']['Api-Key'] = api_key
if api_username != '':
params['headers']['Api-Username'] = api_username
return params

View file

@ -1,12 +1,14 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
DuckDuckGo Lite
~~~~~~~~~~~~~~~
DuckDuckGo WEB
~~~~~~~~~~~~~~
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import re
from urllib.parse import urlencode
from urllib.parse import urlencode, quote_plus
import json
import babel
import lxml.html
@ -18,13 +20,13 @@ from searx import (
)
from searx.utils import (
eval_xpath,
eval_xpath_getindex,
extr,
extract_text,
)
from searx.network import get # see https://github.com/searxng/searxng/issues/762
from searx import redisdb
from searx.enginelib.traits import EngineTraits
from searx.utils import extr
from searx.exceptions import SearxEngineCaptchaException
if TYPE_CHECKING:
import logging
@ -42,7 +44,7 @@ about = {
}
send_accept_language_header = True
"""DuckDuckGo-Lite tries to guess user's prefered language from the HTTP
"""DuckDuckGo-Lite tries to guess user's preferred language from the HTTP
``Accept-Language``. Optional the user can select a region filter (but not a
language).
"""
@ -53,47 +55,37 @@ paging = True
time_range_support = True
safesearch = True # user can't select but the results are filtered
url = 'https://lite.duckduckgo.com/lite/'
# url_ping = 'https://duckduckgo.com/t/sl_l'
url = "https://html.duckduckgo.com/html"
time_range_dict = {'day': 'd', 'week': 'w', 'month': 'm', 'year': 'y'}
form_data = {'v': 'l', 'api': 'd.js', 'o': 'json'}
__CACHE = []
def cache_vqd(query, value):
def _cache_key(query: str, region: str):
return 'SearXNG_ddg_web_vqd' + redislib.secret_hash(f"{query}//{region}")
def cache_vqd(query: str, region: str, value: str):
"""Caches a ``vqd`` value from a query."""
c = redisdb.client()
if c:
logger.debug("cache vqd value: %s", value)
key = 'SearXNG_ddg_web_vqd' + redislib.secret_hash(query)
c.set(key, value, ex=600)
logger.debug("VALKEY cache vqd value: %s (%s)", value, region)
c.set(_cache_key(query, region), value, ex=600)
else:
logger.debug("MEM cache vqd value: %s (%s)", value, region)
if len(__CACHE) > 100: # cache vqd from last 100 queries
__CACHE.pop(0)
__CACHE.append((_cache_key(query, region), value))
def get_vqd(query):
"""Returns the ``vqd`` that fits to the *query*. If there is no ``vqd`` cached
(:py:obj:`cache_vqd`) the query is sent to DDG to get a vqd value from the
response.
def get_vqd(query: str, region: str, force_request: bool = False):
"""Returns the ``vqd`` that fits to the *query*.
.. hint::
If an empty string is returned there are no results for the ``query`` and
therefore no ``vqd`` value.
DDG's bot detection is sensitive to the ``vqd`` value. For some search terms
(such as extremely long search terms that are often sent by bots), no ``vqd``
value can be determined.
If SearXNG cannot determine a ``vqd`` value, then no request should go out
to DDG:
A request with a wrong ``vqd`` value leads to DDG temporarily putting
SearXNG's IP on a block list.
Requests from IPs in this block list run into timeouts.
Not sure, but it seems the block list is a sliding window: to get my IP rid
from the bot list I had to cool down my IP for 1h (send no requests from
that IP to DDG).
:param query: The query term
:param region: DDG's region code
:param force_request: force a request to get a vqd value from DDG
TL;DR; the ``vqd`` value is needed to pass DDG's bot protection and is used
by all request to DDG:
@ -104,29 +96,47 @@ def get_vqd(query):
- DuckDuckGo Videos: ``https://duckduckgo.com/v.js??q=...&vqd=...``
- DuckDuckGo News: ``https://duckduckgo.com/news.js??q=...&vqd=...``
DDG's bot detection is sensitive to the ``vqd`` value. For some search terms
(such as extremely long search terms that are often sent by bots), no ``vqd``
value can be determined.
If SearXNG cannot determine a ``vqd`` value, then no request should go out
to DDG.
.. attention::
A request with a wrong ``vqd`` value leads to DDG temporarily putting
SearXNG's IP on a block list.
Requests from IPs in this block list run into timeouts. Not sure, but it
seems the block list is a sliding window: to get my IP rid from the bot list
I had to cool down my IP for 1h (send no requests from that IP to DDG).
"""
value = None
key = _cache_key(query, region)
c = redisdb.client()
if c:
key = 'SearXNG_ddg_web_vqd' + redislib.secret_hash(query)
value = c.get(key)
if value or value == b'':
value = value.decode('utf-8')
logger.debug("re-use cached vqd value: %s", value)
value = value.decode('utf-8') # type: ignore
logger.debug("re-use CACHED vqd value: %s", value)
return value
query_url = 'https://duckduckgo.com/?' + urlencode({'q': query})
res = get(query_url)
doc = lxml.html.fromstring(res.text)
for script in doc.xpath("//script[@type='text/javascript']"):
script = script.text
if 'vqd="' in script:
value = extr(script, 'vqd="', '"')
break
logger.debug("new vqd value: '%s'", value)
if value is not None:
cache_vqd(query, value)
return value
for k, value in __CACHE:
if k == key:
logger.debug("MEM re-use CACHED vqd value: %s", value)
return value
if force_request:
resp = get(f'https://duckduckgo.com/?q={quote_plus(query)}')
if resp.status_code == 200: # type: ignore
value = extr(resp.text, 'vqd="', '"') # type: ignore
if value:
logger.debug("vqd value from DDG request: %s", value)
cache_vqd(query, region, value)
return value
return None
def get_ddg_lang(eng_traits: EngineTraits, sxng_locale, default='en_US'):
@ -154,9 +164,10 @@ def get_ddg_lang(eng_traits: EngineTraits, sxng_locale, default='en_US'):
.. hint::
`DDG-lite <https://lite.duckduckgo.com/lite>`__ does not offer a language
selection to the user, only a region can be selected by the user
(``eng_region`` from the example above). DDG-lite stores the selected
`DDG-lite <https://lite.duckduckgo.com/lite>`__ and the *no Javascript*
page https://html.duckduckgo.com/html do not offer a language selection
to the user, only a region can be selected by the user (``eng_region``
from the example above). DDG-lite and *no Javascript* store the selected
region in a cookie::
params['cookies']['kl'] = eng_region # 'ar-es'
@ -240,10 +251,27 @@ def request(query, params):
query = quote_ddg_bangs(query)
# request needs a vqd argument
vqd = get_vqd(query)
if len(query) >= 500:
# DDG does not accept queries with more than 499 chars
params["url"] = None
return
# Advanced search syntax ends in CAPTCHA
# https://duckduckgo.com/duckduckgo-help-pages/results/syntax/
query = " ".join(
[
x.removeprefix("site:").removeprefix("intitle:").removeprefix("inurl:").removeprefix("filetype:")
for x in query.split()
]
)
eng_region: str = traits.get_region(params['searxng_locale'], traits.all_locale) # type: ignore
if eng_region == "wt-wt":
# https://html.duckduckgo.com/html sets an empty value for "all".
eng_region = ""
params['data']['kl'] = eng_region
params['cookies']['kl'] = eng_region
eng_region = traits.get_region(params['searxng_locale'], traits.all_locale)
# eng_lang = get_ddg_lang(traits, params['searxng_locale'])
params['url'] = url
@ -251,45 +279,79 @@ def request(query, params):
params['data']['q'] = query
# The API is not documented, so we do some reverse engineering and emulate
# what https://lite.duckduckgo.com/lite/ does when you press "next Page"
# link again and again ..
# what https://html.duckduckgo.com/html does when you press "next Page" link
# again and again ..
params['headers']['Content-Type'] = 'application/x-www-form-urlencoded'
params['data']['vqd'] = vqd
# initial page does not have an offset
params['headers']['Sec-Fetch-Dest'] = "document"
params['headers']['Sec-Fetch-Mode'] = "navigate" # at least this one is used by ddg's bot detection
params['headers']['Sec-Fetch-Site'] = "same-origin"
params['headers']['Sec-Fetch-User'] = "?1"
# Form of the initial search page does have empty values in the form
if params['pageno'] == 1:
params['data']['b'] = ""
params['data']['df'] = ''
if params['time_range'] in time_range_dict:
params['data']['df'] = time_range_dict[params['time_range']]
params['cookies']['df'] = time_range_dict[params['time_range']]
if params['pageno'] == 2:
# second page does have an offset of 20
offset = (params['pageno'] - 1) * 20
params['data']['s'] = offset
params['data']['dc'] = offset + 1
elif params['pageno'] > 2:
# third and following pages do have an offset of 20 + n*50
offset = 20 + (params['pageno'] - 2) * 50
params['data']['s'] = offset
params['data']['dc'] = offset + 1
# initial page does not have additional data in the input form
if params['pageno'] > 1:
# initial page does not have these additional data in the input form
params['data']['o'] = form_data.get('o', 'json')
params['data']['api'] = form_data.get('api', 'd.js')
params['data']['nextParams'] = form_data.get('nextParams', '')
params['data']['v'] = form_data.get('v', 'l')
params['headers']['Referer'] = 'https://lite.duckduckgo.com/'
params['headers']['Referer'] = url
params['data']['kl'] = eng_region
params['cookies']['kl'] = eng_region
vqd = get_vqd(query, eng_region, force_request=False)
params['data']['df'] = ''
if params['time_range'] in time_range_dict:
params['data']['df'] = time_range_dict[params['time_range']]
params['cookies']['df'] = time_range_dict[params['time_range']]
# Certain conditions must be met in order to call up one of the
# following pages ...
if vqd:
params['data']['vqd'] = vqd # follow up pages / requests needs a vqd argument
else:
# Don't try to call follow up pages without a vqd value. DDG
# recognizes this as a request from a bot. This lowers the
# reputation of the SearXNG IP and DDG starts to activate CAPTCHAs.
params["url"] = None
return
if params['searxng_locale'].startswith("zh"):
# Some locales (at least China) do not have a "next page" button and ddg
# will return a HTTP/2 403 Forbidden for a request of such a page.
params["url"] = None
return
logger.debug("param data: %s", params['data'])
logger.debug("param cookies: %s", params['cookies'])
return params
def is_ddg_captcha(dom):
"""In case of CAPTCHA ddg response its own *not a Robot* dialog and is not
redirected to a CAPTCHA page."""
return bool(eval_xpath(dom, "//form[@id='challenge-form']"))
def response(resp):
@ -300,38 +362,40 @@ def response(resp):
results = []
doc = lxml.html.fromstring(resp.text)
result_table = eval_xpath(doc, '//html/body/form/div[@class="filters"]/table')
if is_ddg_captcha(doc):
# set suspend time to zero is OK --> ddg does not block the IP
raise SearxEngineCaptchaException(suspended_time=0, message=f"CAPTCHA ({resp.search_params['data'].get('kl')})")
if len(result_table) == 2:
# some locales (at least China) does not have a "next page" button and
# the layout of the HTML tables is different.
result_table = result_table[1]
elif not len(result_table) >= 3:
# no more results
return []
else:
result_table = result_table[2]
# update form data from response
form = eval_xpath(doc, '//html/body/form/div[@class="filters"]/table//input/..')
if len(form):
form = eval_xpath(doc, '//input[@name="vqd"]/..')
if len(form):
# some locales (at least China) does not have a "next page" button
form = form[0]
form_vqd = eval_xpath(form, '//input[@name="vqd"]/@value')[0]
form = form[0]
form_data['v'] = eval_xpath(form, '//input[@name="v"]/@value')[0]
form_data['api'] = eval_xpath(form, '//input[@name="api"]/@value')[0]
form_data['o'] = eval_xpath(form, '//input[@name="o"]/@value')[0]
logger.debug('form_data: %s', form_data)
cache_vqd(resp.search_params['data']['q'], resp.search_params['data']['kl'], form_vqd)
tr_rows = eval_xpath(result_table, './/tr')
# In the last <tr> is the form of the 'previous/next page' links
tr_rows = tr_rows[:-1]
# just select "web-result" and ignore results of class "result--ad result--ad--small"
for div_result in eval_xpath(doc, '//div[@id="links"]/div[contains(@class, "web-result")]'):
len_tr_rows = len(tr_rows)
offset = 0
item = {}
title = eval_xpath(div_result, './/h2/a')
if not title:
# this is the "No results." item in the result list
continue
item["title"] = extract_text(title)
item["url"] = eval_xpath(div_result, './/h2/a/@href')[0]
item["content"] = extract_text(eval_xpath(div_result, './/a[contains(@class, "result__snippet")]')[0])
zero_click_info_xpath = '//html/body/form/div/table[2]/tr[2]/td/text()'
zero_click = extract_text(eval_xpath(doc, zero_click_info_xpath)).strip()
results.append(item)
if zero_click and "Your IP address is" not in zero_click:
zero_click_info_xpath = '//div[@id="zero_click_abstract"]'
zero_click = extract_text(eval_xpath(doc, zero_click_info_xpath)).strip() # type: ignore
if zero_click and (
"Your IP address is" not in zero_click
and "Your user agent:" not in zero_click
and "URL Decoded:" not in zero_click
):
current_query = resp.search_params["data"].get("q")
results.append(
@ -341,33 +405,6 @@ def response(resp):
}
)
while len_tr_rows >= offset + 4:
# assemble table rows we need to scrap
tr_title = tr_rows[offset]
tr_content = tr_rows[offset + 1]
offset += 4
# ignore sponsored Adds <tr class="result-sponsored">
if tr_content.get('class') == 'result-sponsored':
continue
a_tag = eval_xpath_getindex(tr_title, './/td//a[@class="result-link"]', 0, None)
if a_tag is None:
continue
td_content = eval_xpath_getindex(tr_content, './/td[@class="result-snippet"]', 0, None)
if td_content is None:
continue
results.append(
{
'title': a_tag.text_content(),
'content': extract_text(td_content),
'url': a_tag.get('href'),
}
)
return results
@ -375,7 +412,7 @@ def fetch_traits(engine_traits: EngineTraits):
"""Fetch languages & regions from DuckDuckGo.
SearXNG's ``all`` locale maps DuckDuckGo's "Alle regions" (``wt-wt``).
DuckDuckGo's language "Browsers prefered language" (``wt_WT``) makes no
DuckDuckGo's language "Browsers preferred language" (``wt_WT``) makes no
sense in a SearXNG request since SearXNG's ``all`` will not add a
``Accept-Language`` HTTP header. The value in ``engine_traits.all_locale``
is ``wt-wt`` (the region).
@ -405,7 +442,7 @@ def fetch_traits(engine_traits: EngineTraits):
if not resp.ok: # type: ignore
print("ERROR: response from DuckDuckGo is not OK.")
js_code = extr(resp.text, 'regions:', ',snippetLengths')
js_code = extr(resp.text, 'regions:', ',snippetLengths') # type: ignore
regions = json.loads(js_code)
for eng_tag, name in regions.items():
@ -439,7 +476,7 @@ def fetch_traits(engine_traits: EngineTraits):
engine_traits.custom['lang_region'] = {}
js_code = extr(resp.text, 'languages:', ',regions')
js_code = extr(resp.text, 'languages:', ',regions') # type: ignore
languages = js_variable_to_python(js_code)
for eng_lang, name in languages.items():

View file

@ -4,15 +4,15 @@ DuckDuckGo Extra (images, videos, news)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from urllib.parse import urlencode
from searx.utils import get_embeded_stream_url
from searx.engines.duckduckgo import fetch_traits # pylint: disable=unused-import
from searx.engines.duckduckgo import (
get_ddg_lang,
get_vqd,
)
from searx.engines.duckduckgo import get_ddg_lang, get_vqd
from searx.enginelib.traits import EngineTraits
if TYPE_CHECKING:
@ -47,15 +47,16 @@ search_path_map = {'images': 'i', 'videos': 'v', 'news': 'news'}
def request(query, params):
eng_region: str = traits.get_region(params['searxng_locale'], traits.all_locale) # type: ignore
# request needs a vqd argument
vqd = get_vqd(query)
vqd = get_vqd(query, eng_region, force_request=True)
if not vqd:
# some search terms do not have results and therefore no vqd value
params['url'] = None
return params
eng_region = traits.get_region(params['searxng_locale'], traits.all_locale)
eng_lang = get_ddg_lang(traits, params['searxng_locale'])
args = {
@ -85,6 +86,12 @@ def request(query, params):
params['url'] = f'https://duckduckgo.com/{search_path_map[ddg_category]}.js?{urlencode(args)}'
# sending these two headers prevents rate limiting for the query
params['headers'] = {
'Referer': 'https://duckduckgo.com/',
'X-Requested-With': 'XMLHttpRequest',
}
return params
@ -108,7 +115,7 @@ def _video_result(result):
'title': result['title'],
'content': result['description'],
'thumbnail': result['images'].get('small') or result['images'].get('medium'),
'iframe_src': result['embed_url'],
'iframe_src': get_embeded_stream_url(result['content']),
'source': result['provider'],
'length': result['duration'],
'metadata': result.get('uploader'),

View file

@ -35,8 +35,8 @@ def response(resp):
results = []
for item in search_res:
img = 'https://findthatmeme.us-southeast-1.linodeobjects.com/' + item['image_path']
thumb = 'https://findthatmeme.us-southeast-1.linodeobjects.com/thumb/' + item.get('thumbnail', '')
img = 'https://s3.thehackerblog.com/findthatmeme/' + item['image_path']
thumb = 'https://s3.thehackerblog.com/findthatmeme/thumb/' + item.get('thumbnail', '')
date = datetime.strptime(item["updated_at"].split("T")[0], "%Y-%m-%d")
formatted_date = datetime.utcfromtimestamp(date.timestamp())

97
searx/engines/geizhals.py Normal file
View file

@ -0,0 +1,97 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Geizhals is a German website to compare the price of a product on the
most common German shopping sites and find the lowest price.
The sorting of the search results can be influenced by the following additions
to the search term:
``asc`` or ``price``
To sort by price in ascending order.
``desc``
To sort by price in descending order.
"""
import re
from urllib.parse import urlencode
from lxml import html
from searx.utils import eval_xpath, eval_xpath_list, extract_text
about = {
'website': 'https://geizhals.de',
'wikidata_id': 'Q15977657',
'use_official_api': False,
'official_api_documentation': None,
'require_api_key': False,
'results': 'HTML',
'language': 'de',
}
paging = True
categories = ['shopping']
base_url = "https://geizhals.de"
sort_order = 'relevance'
SORT_RE = re.compile(r"sort:(\w+)")
sort_order_map = {
'relevance': None,
'price': 'p',
'asc': 'p',
'desc': '-p',
}
def request(query, params):
sort = None
sort_order_path = SORT_RE.search(query)
if sort_order_path:
sort = sort_order_map.get(sort_order_path.group(1))
query = SORT_RE.sub("", query)
logger.debug(query)
args = {
'fs': query,
'pg': params['pageno'],
'toggle_all': 1, # load item specs
'sort': sort,
}
params['url'] = f"{base_url}/?{urlencode(args)}"
return params
def response(resp):
results = []
dom = html.fromstring(resp.text)
for result in eval_xpath_list(dom, "//article[contains(@class, 'listview__item')]"):
content = []
for spec in eval_xpath_list(result, ".//div[contains(@class, 'specs-grid__item')]"):
content.append(f"{extract_text(eval_xpath(spec, './dt'))}: {extract_text(eval_xpath(spec, './dd'))}")
metadata = [
extract_text(eval_xpath(result, ".//div[contains(@class, 'stars-rating-label')]")),
extract_text(eval_xpath(result, ".//div[contains(@class, 'listview__offercount')]")),
]
item = {
'template': 'products.html',
'url': (
base_url + "/" + extract_text(eval_xpath(result, ".//a[contains(@class, 'listview__name-link')]/@href"))
),
'title': extract_text(eval_xpath(result, ".//h3[contains(@class, 'listview__name')]")),
'content': ' | '.join(content),
'thumbnail': extract_text(eval_xpath(result, ".//img[contains(@class, 'listview__image')]/@src")),
'metadata': ', '.join(item for item in metadata if item),
}
best_price = extract_text(eval_xpath(result, ".//a[contains(@class, 'listview__price-link')]")).split(" ")
if len(best_price) > 1:
item["price"] = f"Bestes Angebot: {best_price[1]}"
results.append(item)
return results

View file

@ -1,125 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Gentoo Wiki
"""
from urllib.parse import urlencode, urljoin
from lxml import html
from searx.utils import extract_text
# about
about = {
"website": 'https://wiki.gentoo.org/',
"wikidata_id": 'Q1050637',
"official_api_documentation": 'https://wiki.gentoo.org/api.php',
"use_official_api": False,
"require_api_key": False,
"results": 'HTML',
}
# engine dependent config
categories = ['it', 'software wikis']
paging = True
base_url = 'https://wiki.gentoo.org'
# xpath queries
xpath_results = '//ul[@class="mw-search-results"]/li'
xpath_link = './/div[@class="mw-search-result-heading"]/a'
xpath_content = './/div[@class="searchresult"]'
# cut 'en' from 'en-US', 'de' from 'de-CH', and so on
def locale_to_lang_code(locale):
if locale.find('-') >= 0:
locale = locale.split('-')[0]
return locale
# wikis for some languages were moved off from the main site, we need to make
# requests to correct URLs to be able to get results in those languages
lang_urls = {
'en': {'base': 'https://wiki.gentoo.org', 'search': '/index.php?title=Special:Search&offset={offset}&{query}'},
'others': {
'base': 'https://wiki.gentoo.org',
'search': '/index.php?title=Special:Search&offset={offset}&{query}\
&profile=translation&languagefilter={language}',
},
}
# get base & search URLs for selected language
def get_lang_urls(language):
if language != 'en':
return lang_urls['others']
return lang_urls['en']
# Language names to build search requests for
# those languages which are hosted on the main site.
main_langs = {
'ar': 'العربية',
'bg': 'Български',
'cs': 'Česky',
'da': 'Dansk',
'el': 'Ελληνικά',
'es': 'Español',
'he': 'עברית',
'hr': 'Hrvatski',
'hu': 'Magyar',
'it': 'Italiano',
'ko': '한국어',
'lt': 'Lietuviškai',
'nl': 'Nederlands',
'pl': 'Polski',
'pt': 'Português',
'ru': 'Русский',
'sl': 'Slovenský',
'th': 'ไทย',
'uk': 'Українська',
'zh': '简体中文',
}
# do search-request
def request(query, params):
# translate the locale (e.g. 'en-US') to language code ('en')
language = locale_to_lang_code(params['language'])
# if our language is hosted on the main site, we need to add its name
# to the query in order to narrow the results to that language
if language in main_langs:
query += ' (' + main_langs[language] + ')'
# prepare the request parameters
query = urlencode({'search': query})
offset = (params['pageno'] - 1) * 20
# get request URLs for our language of choice
urls = get_lang_urls(language)
search_url = urls['base'] + urls['search']
params['url'] = search_url.format(query=query, offset=offset, language=language)
return params
# get response from search-request
def response(resp):
# get the base URL for the language in which request was made
language = locale_to_lang_code(resp.search_params['language'])
url = get_lang_urls(language)['base']
results = []
dom = html.fromstring(resp.text)
# parse results
for result in dom.xpath(xpath_results):
link = result.xpath(xpath_link)[0]
href = urljoin(url, link.attrib.get('href'))
title = extract_text(link)
content = extract_text(result.xpath(xpath_content))
results.append({'url': href, 'title': title, 'content': content})
return results

View file

@ -1,7 +1,8 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Engine to search in collaborative software platforms based on Gitea_.
"""Engine to search in collaborative software platforms based on Gitea_ or Forgejo_.
.. _Gitea: https://about.gitea.com/
.. _Forgejo: https://forgejo.org/
Configuration
=============
@ -23,6 +24,11 @@ Optional settings are:
base_url: https://gitea.com
shortcut: gitea
- name: forgejo.com
engine: gitea
base_url: https://code.forgejo.org
shortcut: forgejo
If you would like to use additional instances, just configure new engines in the
:ref:`settings <settings engine>` and set the ``base_url``.
@ -95,13 +101,14 @@ def response(resp):
'url': item.get('html_url'),
'title': item.get('full_name'),
'content': ' / '.join(content),
'img_src': item.get('owner', {}).get('avatar_url'),
# Use Repository Avatar and fall back to Owner Avatar if not set.
'thumbnail': item.get('avatar_url') or item.get('owner', {}).get('avatar_url'),
'package_name': item.get('name'),
'maintainer': item.get('owner', {}).get('login'),
'maintainer': item.get('owner', {}).get('username'),
'publishedDate': parser.parse(item.get("updated_at") or item.get("created_at")),
'tags': item.get('topics', []),
'popularity': item.get('stargazers_count'),
'homepage': item.get('homepage'),
'popularity': item.get('stars_count'),
'homepage': item.get('website'),
'source_code_url': item.get('clone_url'),
}
)

95
searx/engines/gitlab.py Normal file
View file

@ -0,0 +1,95 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Engine to search in collaborative software platforms based on GitLab_ with
the `GitLab REST API`_.
.. _GitLab: https://about.gitlab.com/install/
.. _GitLab REST API: https://docs.gitlab.com/ee/api/
Configuration
=============
The engine has the following mandatory setting:
- :py:obj:`base_url`
Optional settings are:
- :py:obj:`api_path`
.. code:: yaml
- name: gitlab
engine: gitlab
base_url: https://gitlab.com
shortcut: gl
about:
website: https://gitlab.com/
wikidata_id: Q16639197
- name: gnome
engine: gitlab
base_url: https://gitlab.gnome.org
shortcut: gn
about:
website: https://gitlab.gnome.org
wikidata_id: Q44316
Implementations
===============
"""
from urllib.parse import urlencode
from dateutil import parser
about = {
"website": None,
"wikidata_id": None,
"official_api_documentation": "https://docs.gitlab.com/ee/api/",
"use_official_api": True,
"require_api_key": False,
"results": "JSON",
}
categories = ['it', 'repos']
paging = True
base_url: str = ""
"""Base URL of the GitLab host."""
api_path: str = 'api/v4/projects'
"""The path the `project API <https://docs.gitlab.com/ee/api/projects.html>`_.
The default path should work fine usually.
"""
def request(query, params):
args = {'search': query, 'page': params['pageno']}
params['url'] = f"{base_url}/{api_path}?{urlencode(args)}"
return params
def response(resp):
results = []
for item in resp.json():
results.append(
{
'template': 'packages.html',
'url': item.get('web_url'),
'title': item.get('name'),
'content': item.get('description'),
'thumbnail': item.get('avatar_url'),
'package_name': item.get('name'),
'maintainer': item.get('namespace', {}).get('name'),
'publishedDate': parser.parse(item.get('last_activity_at') or item.get("created_at")),
'tags': item.get('tag_list', []),
'popularity': item.get('star_count'),
'homepage': item.get('readme_url'),
'source_code_url': item.get('http_url_to_repo'),
}
)
return results

View file

@ -59,11 +59,6 @@ filter_mapping = {0: 'off', 1: 'medium', 2: 'high'}
# specific xpath variables
# ------------------------
results_xpath = './/div[contains(@jscontroller, "SC7lYd")]'
title_xpath = './/a/h3[1]'
href_xpath = './/a[h3]/@href'
content_xpath = './/div[@data-sncf="1"]'
# Suggestions are links placed in a *card-section*, we extract only the text
# from the links not the links itself.
suggestion_xpath = '//div[contains(@class, "EIaa9b")]//a'
@ -334,31 +329,38 @@ def response(resp):
# results --> answer
answer_list = eval_xpath(dom, '//div[contains(@class, "LGOjhe")]')
for item in answer_list:
for bubble in eval_xpath(item, './/div[@class="nnFGuf"]'):
bubble.drop_tree()
results.append(
{
'answer': item.xpath("normalize-space()"),
'answer': extract_text(item),
'url': (eval_xpath(item, '../..//a/@href') + [None])[0],
}
)
# parse results
for result in eval_xpath_list(dom, results_xpath): # pylint: disable=too-many-nested-blocks
for result in eval_xpath_list(dom, './/div[contains(@jscontroller, "SC7lYd")]'):
# pylint: disable=too-many-nested-blocks
try:
title_tag = eval_xpath_getindex(result, title_xpath, 0, default=None)
title_tag = eval_xpath_getindex(result, './/a/h3[1]', 0, default=None)
if title_tag is None:
# this not one of the common google results *section*
logger.debug('ignoring item from the result_xpath list: missing title')
continue
title = extract_text(title_tag)
url = eval_xpath_getindex(result, href_xpath, 0, None)
url = eval_xpath_getindex(result, './/a[h3]/@href', 0, None)
if url is None:
logger.debug('ignoring item from the result_xpath list: missing url of title "%s"', title)
continue
content_nodes = eval_xpath(result, content_xpath)
content_nodes = eval_xpath(result, './/div[contains(@data-sncf, "1")]')
for item in content_nodes:
for script in item.xpath(".//script"):
script.getparent().remove(script)
content = extract_text(content_nodes)
if not content:
@ -439,7 +441,7 @@ def fetch_traits(engine_traits: EngineTraits, add_domains: bool = True):
try:
locale = babel.Locale.parse(lang_map.get(eng_lang, eng_lang), sep='-')
except babel.UnknownLocaleError:
print("ERROR: %s -> %s is unknown by babel" % (x.get("data-name"), eng_lang))
print("INFO: google UI language %s (%s) is unknown by babel" % (eng_lang, x.text.split("(")[0].strip()))
continue
sxng_lang = language_tag(locale)

View file

@ -34,6 +34,7 @@ from searx.engines.google import (
detect_google_sorry,
)
from searx.enginelib.traits import EngineTraits
from searx.utils import get_embeded_stream_url
if TYPE_CHECKING:
import logging
@ -125,6 +126,7 @@ def response(resp):
'content': content,
'author': pub_info,
'thumbnail': thumbnail,
'iframe_src': get_embeded_stream_url(url),
'template': 'videos.html',
}
)

View file

@ -57,7 +57,11 @@ def request(query, params):
if params['time_range']:
search_type = 'search_by_date'
timestamp = (datetime.now() - relativedelta(**{f"{params['time_range']}s": 1})).timestamp()
timestamp = (
# pylint: disable=unexpected-keyword-arg
datetime.now()
- relativedelta(**{f"{params['time_range']}s": 1}) # type: ignore
).timestamp()
query_params["numericFilters"] = f"created_at_i>{timestamp}"
params["url"] = f"{base_url}/{search_type}?{urlencode(query_params)}"

View file

@ -1,71 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Internet Archive scholar(science)
"""
from datetime import datetime
from urllib.parse import urlencode
from searx.utils import html_to_text
about = {
"website": "https://scholar.archive.org/",
"wikidata_id": "Q115667709",
"official_api_documentation": "https://scholar.archive.org/api/redoc",
"use_official_api": True,
"require_api_key": False,
"results": "JSON",
}
categories = ['science', 'scientific publications']
paging = True
base_url = "https://scholar.archive.org"
results_per_page = 15
def request(query, params):
args = {
"q": query,
"limit": results_per_page,
"offset": (params["pageno"] - 1) * results_per_page,
}
params["url"] = f"{base_url}/search?{urlencode(args)}"
params["headers"]["Accept"] = "application/json"
return params
def response(resp):
results = []
json = resp.json()
for result in json["results"]:
publishedDate, content, doi = None, '', None
if result['biblio'].get('release_date'):
publishedDate = datetime.strptime(result['biblio']['release_date'], "%Y-%m-%d")
if len(result['abstracts']) > 0:
content = result['abstracts'][0].get('body')
elif len(result['_highlights']) > 0:
content = result['_highlights'][0]
if len(result['releases']) > 0:
doi = result['releases'][0].get('doi')
results.append(
{
'template': 'paper.html',
'url': result['fulltext']['access_url'],
'title': result['biblio'].get('title') or result['biblio'].get('container_name'),
'content': html_to_text(content),
'publisher': result['biblio'].get('publisher'),
'doi': doi,
'journal': result['biblio'].get('container_name'),
'authors': result['biblio'].get('contrib_names'),
'tags': result['tags'],
'publishedDate': publishedDate,
'issns': result['biblio'].get('issns'),
'pdf_url': result['fulltext'].get('access_url'),
}
)
return results

View file

@ -7,6 +7,8 @@ import random
from urllib.parse import quote_plus, urlparse
from dateutil import parser
from searx.utils import humanize_number
# about
about = {
"website": 'https://api.invidious.io/',
@ -91,7 +93,8 @@ def response(resp):
"url": url,
"title": result.get("title", ""),
"content": result.get("description", ""),
'length': length,
"length": length,
"views": humanize_number(result['viewCount']),
"template": "videos.html",
"author": result.get("author"),
"publishedDate": publishedDate,

View file

@ -16,23 +16,17 @@ from json import loads
from urllib.parse import urlencode
from searx.utils import to_string, html_to_text
# parameters for generating a request
search_url = None
url_query = None
url_prefix = ""
content_query = None
title_query = None
content_html_to_text = False
title_html_to_text = False
paging = False
suggestion_query = ''
results_query = ''
method = 'GET'
request_body = ''
cookies = {}
headers = {}
'''Some engines might offer different result based on cookies or headers.
Possible use-case: To set safesearch cookie or header to moderate.'''
paging = False
# parameters for engines with paging support
#
# number of results on each page
@ -41,6 +35,16 @@ page_size = 1
# number of the first page (usually 0 or 1)
first_page_num = 1
# parameters for parsing the response
results_query = ''
url_query = None
url_prefix = ""
title_query = None
content_query = None
suggestion_query = ''
title_html_to_text = False
content_html_to_text = False
def iterate(iterable):
if isinstance(iterable, dict):
@ -98,9 +102,8 @@ def query(data, query_string):
def request(query, params): # pylint: disable=redefined-outer-name
query = urlencode({'q': query})[2:]
fp = {'query': urlencode({'q': query})[2:]} # pylint: disable=invalid-name
fp = {'query': query} # pylint: disable=invalid-name
if paging and search_url.find('{pageno}') >= 0:
fp['pageno'] = (params['pageno'] - 1) * page_size + first_page_num
@ -108,7 +111,12 @@ def request(query, params): # pylint: disable=redefined-outer-name
params['headers'].update(headers)
params['url'] = search_url.format(**fp)
params['query'] = query
params['method'] = method
if request_body:
# don't url-encode the query if it's in the request body
fp['query'] = query
params['data'] = request_body.format(**fp)
return params
@ -146,7 +154,11 @@ def response(resp):
}
)
else:
for url, title, content in zip(query(json, url_query), query(json, title_query), query(json, content_query)):
for result in json:
url = query(result, url_query)[0]
title = query(result, title_query)[0]
content = query(result, content_query)[0]
results.append(
{
'url': url_prefix + to_string(url),

View file

@ -31,6 +31,7 @@ def request(_query, params):
params['method'] = 'POST'
params['headers'] = {'Content-Type': 'application/json'}
params['req_url'] = request_url
return params
@ -40,7 +41,13 @@ def response(resp):
json_resp = resp.json()
text = json_resp.get('translatedText')
from_lang = resp.search_params["from_lang"][1]
to_lang = resp.search_params["to_lang"][1]
query = resp.search_params["query"]
req_url = resp.search_params["req_url"]
if text:
results.append({'answer': text})
results.append({"answer": text, "url": f"{req_url}/?source={from_lang}&target={to_lang}&q={query}"})
return results

View file

@ -27,7 +27,7 @@ categories = ['images']
paging = True
endpoint = 'photos'
base_url = 'https://loc.gov'
base_url = 'https://www.loc.gov'
search_string = "/{endpoint}/?sp={page}&{query}&fo=json"
@ -63,8 +63,8 @@ def response(resp):
if not url:
continue
img_src = result['item'].get('service_medium')
if not img_src or img_src == 'https://memory.loc.gov/pp/grp.gif':
img_list = result.get('image_url')
if not img_list:
continue
title = result['title']
@ -88,8 +88,8 @@ def response(resp):
'url': url,
'title': title,
'content': ' / '.join([i for i in content_items if i]),
'img_src': img_src,
'thumbnail_src': result['item'].get('thumb_gallery'),
'img_src': img_list[-1],
'thumbnail_src': img_list[0],
'author': author,
}
)

View file

@ -0,0 +1,95 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""MariaDB is a community driven fork of MySQL. Before enabling MariaDB engine,
you must the install the pip package ``mariadb`` along with the necessary
prerequities.
`See the following documentation for more details
<https://mariadb.com/docs/server/connect/programming-languages/c/install/>`_
Example
=======
This is an example configuration for querying a MariaDB server:
.. code:: yaml
- name: my_database
engine: mariadb_server
database: my_database
username: searxng
password: password
limit: 5
query_str: 'SELECT * from my_table WHERE my_column=%(query)s'
Implementations
===============
"""
from typing import TYPE_CHECKING
try:
import mariadb
except ImportError:
# import error is ignored because the admin has to install mysql manually to use
# the engine
pass
if TYPE_CHECKING:
import logging
logger = logging.getLogger()
engine_type = 'offline'
host = "127.0.0.1"
"""Hostname of the DB connector"""
port = 3306
"""Port of the DB connector"""
database = ""
"""Name of the database."""
username = ""
"""Username for the DB connection."""
password = ""
"""Password for the DB connection."""
query_str = ""
"""SQL query that returns the result items."""
limit = 10
paging = True
result_template = 'key-value.html'
_connection = None
def init(engine_settings):
global _connection # pylint: disable=global-statement
if 'query_str' not in engine_settings:
raise ValueError('query_str cannot be empty')
if not engine_settings['query_str'].lower().startswith('select '):
raise ValueError('only SELECT query is supported')
_connection = mariadb.connect(database=database, user=username, password=password, host=host, port=port)
def search(query, params):
query_params = {'query': query}
query_to_run = query_str + ' LIMIT {0} OFFSET {1}'.format(limit, (params['pageno'] - 1) * limit)
logger.debug("SQL Query: %s", query_to_run)
with _connection.cursor() as cur:
cur.execute(query_to_run, query_params)
results = []
col_names = [i[0] for i in cur.description]
for res in cur:
result = dict(zip(col_names, map(str, res)))
result['template'] = result_template
results.append(result)
return results

View file

@ -100,6 +100,12 @@ base_url: str = 'https://{language}.wikipedia.org/'
ISO 639-1 language code (en, de, fr ..) of the search language.
"""
api_path: str = 'w/api.php'
"""The path the PHP api is listening on.
The default path should work fine usually.
"""
timestamp_format = '%Y-%m-%dT%H:%M:%SZ'
"""The longhand version of MediaWiki time strings."""
@ -113,12 +119,7 @@ def request(query, params):
else:
params['language'] = params['language'].split('-')[0]
if base_url.endswith('/'):
api_url = base_url + 'w/api.php?'
else:
api_url = base_url + '/w/api.php?'
api_url = api_url.format(language=params['language'])
api_url = f"{base_url.rstrip('/')}/{api_path}?".format(language=params['language'])
offset = (params['pageno'] - 1) * number_of_results
args = {

View file

@ -1,12 +1,15 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Mojeek (general, images, news)"""
from typing import TYPE_CHECKING
from datetime import datetime
from urllib.parse import urlencode
from lxml import html
from dateutil.relativedelta import relativedelta
from searx.utils import eval_xpath, eval_xpath_list, extract_text
from searx.enginelib.traits import EngineTraits
about = {
'website': 'https://mojeek.com',
@ -42,6 +45,18 @@ news_url_xpath = './/h2/a/@href'
news_title_xpath = './/h2/a'
news_content_xpath = './/p[@class="s"]'
language_param = 'lb'
region_param = 'arc'
_delta_kwargs = {'day': 'days', 'week': 'weeks', 'month': 'months', 'year': 'years'}
if TYPE_CHECKING:
import logging
logger = logging.getLogger()
traits: EngineTraits
def init(_):
if search_type not in ('', 'images', 'news'):
@ -53,13 +68,16 @@ def request(query, params):
'q': query,
'safe': min(params['safesearch'], 1),
'fmt': search_type,
language_param: traits.get_language(params['searxng_locale'], traits.custom['language_all']),
region_param: traits.get_region(params['searxng_locale'], traits.custom['region_all']),
}
if search_type == '':
args['s'] = 10 * (params['pageno'] - 1)
if params['time_range'] and search_type != 'images':
args["since"] = (datetime.now() - relativedelta(**{f"{params['time_range']}s": 1})).strftime("%Y%m%d")
kwargs = {_delta_kwargs[params['time_range']]: 1}
args["since"] = (datetime.now() - relativedelta(**kwargs)).strftime("%Y%m%d") # type: ignore
logger.debug(args["since"])
params['url'] = f"{base_url}/search?{urlencode(args)}"
@ -94,7 +112,7 @@ def _image_results(dom):
'template': 'images.html',
'url': extract_text(eval_xpath(result, image_url_xpath)),
'title': extract_text(eval_xpath(result, image_title_xpath)),
'img_src': base_url + extract_text(eval_xpath(result, image_img_src_xpath)),
'img_src': base_url + extract_text(eval_xpath(result, image_img_src_xpath)), # type: ignore
'content': '',
}
)
@ -130,3 +148,31 @@ def response(resp):
return _news_results(dom)
raise ValueError(f"Invalid search type {search_type}")
def fetch_traits(engine_traits: EngineTraits):
# pylint: disable=import-outside-toplevel
from searx import network
from searx.locales import get_official_locales, region_tag
from babel import Locale, UnknownLocaleError
import contextlib
resp = network.get(base_url + "/preferences", headers={'Accept-Language': 'en-US,en;q=0.5'})
dom = html.fromstring(resp.text) # type: ignore
languages = eval_xpath_list(dom, f'//select[@name="{language_param}"]/option/@value')
engine_traits.custom['language_all'] = languages[0]
for code in languages[1:]:
with contextlib.suppress(UnknownLocaleError):
locale = Locale(code)
engine_traits.languages[locale.language] = code
regions = eval_xpath_list(dom, f'//select[@name="{region_param}"]/option/@value')
engine_traits.custom['region_all'] = regions[1]
for code in regions[2:]:
for locale in get_official_locales(code, engine_traits.languages):
engine_traits.regions[region_tag(locale)] = code

View file

@ -20,6 +20,8 @@ Otherwise, follow instructions provided by Mullvad for enabling the VPN on Linux
update of SearXNG!
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from httpx import Response
from lxml import html
@ -37,6 +39,8 @@ traits: EngineTraits
use_cache: bool = True # non-cache use only has 100 searches per day!
leta_engine: str = 'google'
search_url = "https://leta.mullvad.net"
# about
@ -61,6 +65,11 @@ time_range_dict = {
"year": "y1",
}
available_leta_engines = [
'google', # first will be default if provided engine is invalid
'brave',
]
def is_vpn_connected(dom: html.HtmlElement) -> bool:
"""Returns true if the VPN is connected, False otherwise"""
@ -80,11 +89,22 @@ def assign_headers(headers: dict) -> dict:
def request(query: str, params: dict):
country = traits.get_region(params.get('searxng_locale', 'all'), traits.all_locale) # type: ignore
result_engine = leta_engine
if leta_engine not in available_leta_engines:
result_engine = available_leta_engines[0]
logger.warning(
'Configured engine "%s" not one of the available engines %s, defaulting to "%s"',
leta_engine,
available_leta_engines,
result_engine,
)
params['url'] = search_url
params['method'] = 'POST'
params['data'] = {
"q": query,
"gl": country if country is str else '',
'engine': result_engine,
}
# pylint: disable=undefined-variable
if use_cache:
@ -107,8 +127,15 @@ def request(query: str, params: dict):
return params
def extract_result(dom_result: html.HtmlElement):
[a_elem, h3_elem, p_elem] = eval_xpath_list(dom_result, 'div/div/*')
def extract_result(dom_result: list[html.HtmlElement]):
# Infoboxes sometimes appear in the beginning and will have a length of 0
if len(dom_result) == 3:
[a_elem, h3_elem, p_elem] = dom_result
elif len(dom_result) == 4:
[_, a_elem, h3_elem, p_elem] = dom_result
else:
return None
return {
'url': extract_text(a_elem.text),
'title': extract_text(h3_elem),
@ -116,6 +143,14 @@ def extract_result(dom_result: html.HtmlElement):
}
def extract_results(search_results: html.HtmlElement):
for search_result in search_results:
dom_result = eval_xpath_list(search_result, 'div/div/*')
result = extract_result(dom_result)
if result is not None:
yield result
def response(resp: Response):
"""Checks if connected to Mullvad VPN, then extracts the search results from
the DOM resp: requests response object"""
@ -124,7 +159,7 @@ def response(resp: Response):
if not is_vpn_connected(dom):
raise SearxEngineResponseException('Not connected to Mullvad VPN')
search_results = eval_xpath(dom.body, '//main/div[2]/div')
return [extract_result(sr) for sr in search_results]
return list(extract_results(search_results))
def fetch_traits(engine_traits: EngineTraits):

View file

@ -34,12 +34,25 @@ except ImportError:
engine_type = 'offline'
auth_plugin = 'caching_sha2_password'
host = "127.0.0.1"
"""Hostname of the DB connector"""
port = 3306
"""Port of the DB connector"""
database = ""
"""Name of the database."""
username = ""
"""Username for the DB connection."""
password = ""
"""Password for the DB connection."""
query_str = ""
"""SQL query that returns the result items."""
limit = 10
paging = True
result_template = 'key-value.html'

View file

@ -0,0 +1,71 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Open library (books)
"""
from urllib.parse import urlencode
import re
from dateutil import parser
about = {
'website': 'https://openlibrary.org',
'wikidata_id': 'Q1201876',
'require_api_key': False,
'use_official_api': False,
'official_api_documentation': 'https://openlibrary.org/developers/api',
}
paging = True
categories = []
base_url = "https://openlibrary.org"
results_per_page = 10
def request(query, params):
args = {
'q': query,
'page': params['pageno'],
'limit': results_per_page,
}
params['url'] = f"{base_url}/search.json?{urlencode(args)}"
return params
def _parse_date(date):
try:
return parser.parse(date)
except parser.ParserError:
return None
def response(resp):
results = []
for item in resp.json().get("docs", []):
cover = None
if 'lending_identifier_s' in item:
cover = f"https://archive.org/services/img/{item['lending_identifier_s']}"
published = item.get('publish_date')
if published:
published_dates = [date for date in map(_parse_date, published) if date]
if published_dates:
published = min(published_dates)
if not published:
published = parser.parse(str(item.get('first_published_year')))
result = {
'template': 'paper.html',
'url': f"{base_url}{item['key']}",
'title': item['title'],
'content': re.sub(r"\{|\}", "", item['first_sentence'][0]) if item.get('first_sentence') else '',
'isbn': item.get('isbn', [])[:5],
'authors': item.get('author_name', []),
'thumbnail': cover,
'publishedDate': published,
'tags': item.get('subject', [])[:10] + item.get('place', [])[:10],
}
results.append(result)
return results

View file

@ -14,7 +14,7 @@ import babel
from searx.network import get # see https://github.com/searxng/searxng/issues/762
from searx.locales import language_tag
from searx.utils import html_to_text
from searx.utils import html_to_text, humanize_number
from searx.enginelib.traits import EngineTraits
traits: EngineTraits
@ -124,6 +124,7 @@ def video_response(resp):
'content': html_to_text(result.get('description') or ''),
'author': result.get('account', {}).get('displayName'),
'length': minute_to_hm(result.get('duration')),
'views': humanize_number(result['views']),
'template': 'videos.html',
'publishedDate': parse(result['publishedAt']),
'iframe_src': result.get('embedUrl'),

View file

@ -53,6 +53,8 @@ from urllib.parse import urlencode
import datetime
from dateutil import parser
from searx.utils import humanize_number
# about
about = {
"website": 'https://github.com/TeamPiped/Piped/',
@ -138,6 +140,7 @@ def response(resp):
"title": result.get("title", ""),
"publishedDate": parser.parse(time.ctime(uploaded / 1000)) if uploaded != -1 else None,
"iframe_src": _frontend_url() + '/embed' + result.get("url", ""),
"views": humanize_number(result["views"]),
}
length = result.get("duration")
if length:

View file

@ -29,12 +29,25 @@ except ImportError:
pass
engine_type = 'offline'
host = "127.0.0.1"
"""Hostname of the DB connector"""
port = "5432"
"""Port of the DB connector"""
database = ""
"""Name of the database."""
username = ""
"""Username for the DB connection."""
password = ""
"""Password for the DB connection."""
query_str = ""
"""SQL query that returns the result items."""
limit = 10
paging = True
result_template = 'key-value.html'

View file

@ -49,7 +49,11 @@ from flask_babel import gettext
import babel
import lxml
from searx.exceptions import SearxEngineAPIException, SearxEngineTooManyRequestsException
from searx.exceptions import (
SearxEngineAPIException,
SearxEngineTooManyRequestsException,
SearxEngineCaptchaException,
)
from searx.network import raise_for_httperror
from searx.enginelib.traits import EngineTraits
@ -57,6 +61,7 @@ from searx.utils import (
eval_xpath,
eval_xpath_list,
extract_text,
get_embeded_stream_url,
)
traits: EngineTraits
@ -187,6 +192,8 @@ def parse_web_api(resp):
error_code = data.get('error_code')
if error_code == 24:
raise SearxEngineTooManyRequestsException()
if search_results.get("data", {}).get("error_data", {}).get("captchaUrl") is not None:
raise SearxEngineCaptchaException()
msg = ",".join(data.get('message', ['unknown']))
raise SearxEngineAPIException(f"{msg} ({error_code})")
@ -297,6 +304,7 @@ def parse_web_api(resp):
'title': title,
'url': res_url,
'content': content,
'iframe_src': get_embeded_stream_url(res_url),
'publishedDate': pub_date,
'thumbnail': thumbnail,
'template': 'videos.html',

View file

@ -165,10 +165,12 @@ def fetch_traits(engine_traits: EngineTraits):
countrycodes = set()
for region in country_list:
if region['iso_3166_1'] not in babel_reg_list:
# country_list contains duplicates that differ only in upper/lower case
_reg = region['iso_3166_1'].upper()
if _reg not in babel_reg_list:
print(f"ERROR: region tag {region['iso_3166_1']} is unknown by babel")
continue
countrycodes.add(region['iso_3166_1'])
countrycodes.add(_reg)
countrycodes = list(countrycodes)
countrycodes.sort()

View file

@ -1,98 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Słownik Języka Polskiego
Dictionary of the polish language from PWN (sjp.pwn)
"""
from lxml.html import fromstring
from searx import logger
from searx.utils import extract_text
from searx.network import raise_for_httperror
logger = logger.getChild('sjp engine')
# about
about = {
"website": 'https://sjp.pwn.pl',
"wikidata_id": 'Q55117369',
"official_api_documentation": None,
"use_official_api": False,
"require_api_key": False,
"results": 'HTML',
"language": 'pl',
}
categories = ['dictionaries']
paging = False
URL = 'https://sjp.pwn.pl'
SEARCH_URL = URL + '/szukaj/{query}.html'
word_xpath = '//div[@class="query"]'
dict_xpath = [
'//div[@class="wyniki sjp-so-wyniki sjp-so-anchor"]',
'//div[@class="wyniki sjp-wyniki sjp-anchor"]',
'//div[@class="wyniki sjp-doroszewski-wyniki sjp-doroszewski-anchor"]',
]
def request(query, params):
params['url'] = SEARCH_URL.format(query=query)
logger.debug(f"query_url --> {params['url']}")
return params
def response(resp):
results = []
raise_for_httperror(resp)
dom = fromstring(resp.text)
word = extract_text(dom.xpath(word_xpath))
definitions = []
for dict_src in dict_xpath:
for src in dom.xpath(dict_src):
src_text = extract_text(src.xpath('.//span[@class="entry-head-title"]/text()')).strip()
src_defs = []
for def_item in src.xpath('.//div[contains(@class, "ribbon-element")]'):
if def_item.xpath('./div[@class="znacz"]'):
sub_defs = []
for def_sub_item in def_item.xpath('./div[@class="znacz"]'):
def_sub_text = extract_text(def_sub_item).lstrip('0123456789. ')
sub_defs.append(def_sub_text)
src_defs.append((word, sub_defs))
else:
def_text = extract_text(def_item).strip()
def_link = def_item.xpath('./span/a/@href')
if 'doroszewski' in def_link[0]:
def_text = f"<a href='{def_link[0]}'>{def_text}</a>"
src_defs.append((def_text, ''))
definitions.append((src_text, src_defs))
if not definitions:
return results
infobox = ''
for src in definitions:
infobox += f"<div><small>{src[0]}</small>"
infobox += "<ul>"
for def_text, sub_def in src[1]:
infobox += f"<li>{def_text}</li>"
if sub_def:
infobox += "<ol>"
for sub_def_text in sub_def:
infobox += f"<li>{sub_def_text}</li>"
infobox += "</ol>"
infobox += "</ul></div>"
results.append(
{
'infobox': word,
'content': infobox,
}
)
return results

View file

@ -41,8 +41,13 @@ import sqlite3
import contextlib
engine_type = 'offline'
database = ""
"""Filename of the SQLite DB."""
query_str = ""
"""SQL query that returns the result items."""
limit = 10
paging = True
result_template = 'key-value.html'

View file

@ -142,7 +142,7 @@ search_url = base_url + '/sp/search'
# specific xpath variables
# ads xpath //div[@id="results"]/div[@id="sponsored"]//div[@class="result"]
# not ads: div[@class="result"] are the direct childs of div[@id="results"]
# not ads: div[@class="result"] are the direct children of div[@id="results"]
search_form_xpath = '//form[@id="search"]'
"""XPath of Startpage's origin search form

View file

@ -7,6 +7,7 @@ ends.
from json import dumps
from searx.utils import searx_useragent
from searx.enginelib.traits import EngineTraits
about = {
"website": "https://stract.com/",
@ -18,7 +19,10 @@ about = {
categories = ['general']
paging = True
search_url = "https://stract.com/beta/api/search"
base_url = "https://stract.com/beta/api"
search_url = base_url + "/search"
traits: EngineTraits
def request(query, params):
@ -29,7 +33,14 @@ def request(query, params):
'Content-Type': 'application/json',
'User-Agent': searx_useragent(),
}
params['data'] = dumps({'query': query, 'page': params['pageno'] - 1})
region = traits.get_region(params["searxng_locale"], default=traits.all_locale)
params['data'] = dumps(
{
'query': query,
'page': params['pageno'] - 1,
'selectedRegion': region,
}
)
return params
@ -47,3 +58,24 @@ def response(resp):
)
return results
def fetch_traits(engine_traits: EngineTraits):
# pylint: disable=import-outside-toplevel
from searx import network
from babel import Locale, languages
from searx.locales import region_tag
territories = Locale("en").territories
json = network.get(base_url + "/docs/openapi.json").json()
regions = json['components']['schemas']['Region']['enum']
engine_traits.all_locale = regions[0]
for region in regions[1:]:
for code, name in territories.items():
if region not in (code, name):
continue
for lang in languages.get_official_languages(code, de_facto=True):
engine_traits.regions[region_tag(Locale(lang, code))] = region

View file

@ -14,10 +14,16 @@ billion images `[tineye.com] <https://tineye.com/how>`_.
"""
from typing import TYPE_CHECKING
from urllib.parse import urlencode
from datetime import datetime
from flask_babel import gettext
if TYPE_CHECKING:
import logging
logger = logging.getLogger()
about = {
"website": 'https://tineye.com',
"wikidata_id": 'Q2382535',
@ -34,7 +40,7 @@ categories = ['general']
paging = True
safesearch = False
base_url = 'https://tineye.com'
search_string = '/result_json/?page={page}&{query}'
search_string = '/api/v1/result_json/?page={page}&{query}'
FORMAT_NOT_SUPPORTED = gettext(
"Could not read that image url. This may be due to an unsupported file"
@ -120,7 +126,7 @@ def parse_tineye_match(match_json):
crawl_date = backlink_json.get("crawl_date")
if crawl_date:
crawl_date = datetime.fromisoformat(crawl_date[:-3])
crawl_date = datetime.strptime(crawl_date, '%Y-%m-%d')
else:
crawl_date = datetime.min
@ -150,29 +156,15 @@ def parse_tineye_match(match_json):
def response(resp):
"""Parse HTTP response from TinEye."""
results = []
try:
# handle the 422 client side errors, and the possible 400 status code error
if resp.status_code in (400, 422):
json_data = resp.json()
except Exception as exc: # pylint: disable=broad-except
msg = "can't parse JSON response // %s" % exc
logger.error(msg)
json_data = {'error': msg}
# handle error codes from Tineye
if resp.is_error:
if resp.status_code in (400, 422):
message = 'HTTP status: %s' % resp.status_code
error = json_data.get('error')
s_key = json_data.get('suggestions', {}).get('key', '')
if error and s_key:
message = "%s (%s)" % (error, s_key)
elif error:
message = error
suggestions = json_data.get('suggestions', {})
message = f'HTTP Status Code: {resp.status_code}'
if resp.status_code == 422:
s_key = suggestions.get('key', '')
if s_key == "Invalid image URL":
# test https://docs.searxng.org/_static/searxng-wordmark.svg
message = FORMAT_NOT_SUPPORTED
@ -182,16 +174,23 @@ def response(resp):
elif s_key == 'Download Error':
# test https://notexists
message = DOWNLOAD_ERROR
else:
logger.warning("Unknown suggestion key encountered: %s", s_key)
else: # 400
description = suggestions.get('description')
if isinstance(description, list):
message = ','.join(description)
# see https://github.com/searxng/searxng/pull/1456#issuecomment-1193105023
# results.append({'answer': message})
logger.error(message)
# see https://github.com/searxng/searxng/pull/1456#issuecomment-1193105023
# results.append({'answer': message})
logger.error(message)
return []
return results
# Raise for all other responses
resp.raise_for_status()
resp.raise_for_status()
# append results from matches
results = []
json_data = resp.json()
for match_json in json_data['matches']:
@ -209,7 +208,7 @@ def response(resp):
'title': backlink['image_name'],
'img_src': backlink['url'],
'format': tineye_match['image_format'],
'widht': tineye_match['width'],
'width': tineye_match['width'],
'height': tineye_match['height'],
'publishedDate': backlink['crawl_date'],
}

View file

@ -32,7 +32,7 @@ void_arch = 'x86_64'
"""Default architecture to search for. For valid values see :py:obj:`ARCH_RE`"""
ARCH_RE = re.compile('aarch64-musl|armv6l-musl|armv7l-musl|x86_64-musl|aarch64|armv6l|armv7l|i686|x86_64')
"""Regular expresion that match a architecture in the query string."""
"""Regular expression that match a architecture in the query string."""
def request(query, params):

View file

@ -7,6 +7,8 @@ import datetime
from urllib.parse import urlencode
from searx.utils import html_to_text, humanize_bytes
# about
about = {
"website": 'https://commons.wikimedia.org/',
@ -74,7 +76,7 @@ def response(resp):
result = {
'url': imageinfo["descriptionurl"],
'title': title,
'content': item["snippet"],
'content': html_to_text(item["snippet"]),
}
if search_type == "images":
@ -93,7 +95,7 @@ def response(resp):
elif search_type == "files":
result['template'] = 'files.html'
result['metadata'] = imageinfo['mime']
result['size'] = imageinfo['size']
result['size'] = humanize_bytes(imageinfo['size'])
elif search_type == "audio":
result['iframe_src'] = imageinfo['url']

View file

@ -20,13 +20,9 @@ about = {
categories = ['general']
paging = False
URL = 'https://www.wordnik.com'
SEARCH_URL = URL + '/words/{query}'
def request(query, params):
params['url'] = SEARCH_URL.format(query=query)
logger.debug(f"query_url --> {params['url']}")
params['url'] = f"https://www.wordnik.com/words/{query}"
return params

View file

@ -12,6 +12,8 @@ Request:
- :py:obj:`search_url`
- :py:obj:`lang_all`
- :py:obj:`soft_max_redirects`
- :py:obj:`method`
- :py:obj:`request_body`
- :py:obj:`cookies`
- :py:obj:`headers`
@ -151,6 +153,16 @@ headers = {}
'''Some engines might offer different result based headers. Possible use-case:
To set header to moderate.'''
method = 'GET'
'''Some engines might require to do POST requests for search.'''
request_body = ''
'''The body of the request. This can only be used if different :py:obj:`method`
is set, e.g. ``POST``. For formatting see the documentation of :py:obj:`search_url`::
search={query}&page={pageno}{time_range}{safe_search}
'''
paging = False
'''Engine supports paging [True or False].'''
@ -236,8 +248,14 @@ def request(query, params):
params['headers'].update(headers)
params['url'] = search_url.format(**fargs)
params['soft_max_redirects'] = soft_max_redirects
params['method'] = method
if request_body:
# don't url-encode the query if it's in the request body
fargs['query'] = query
params['data'] = request_body.format(**fargs)
params['soft_max_redirects'] = soft_max_redirects
params['raise_for_httperror'] = False
return params

View file

@ -118,6 +118,8 @@ def _base_url() -> str:
url = engines['yacy'].base_url # type: ignore
if isinstance(url, list):
url = random.choice(url)
if url.endswith("/"):
url = url[:-1]
return url

View file

@ -16,6 +16,7 @@ from searx.utils import (
eval_xpath_getindex,
eval_xpath_list,
extract_text,
html_to_text,
)
from searx.enginelib.traits import EngineTraits
@ -133,12 +134,20 @@ def response(resp):
url = parse_url(url)
title = eval_xpath_getindex(result, './/h3//a/@aria-label', 0, default='')
title = extract_text(title)
title: str = extract_text(title)
content = eval_xpath_getindex(result, './/div[contains(@class, "compText")]', 0, default='')
content = extract_text(content, allow_none=True)
content: str = extract_text(content, allow_none=True)
# append result
results.append({'url': url, 'title': title, 'content': content})
results.append(
{
'url': url,
# title sometimes contains HTML tags / see
# https://github.com/searxng/searxng/issues/3790
'title': " ".join(html_to_text(title).strip().split()),
'content': " ".join(html_to_text(content).strip().split()),
}
)
for suggestion in eval_xpath_list(dom, '//div[contains(@class, "AlsoTry")]//table//a'):
# append suggestion

133
searx/engines/yandex.py Normal file
View file

@ -0,0 +1,133 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Yandex (Web, images)"""
from json import loads
from urllib.parse import urlencode
from html import unescape
from lxml import html
from searx.exceptions import SearxEngineCaptchaException
from searx.utils import humanize_bytes, eval_xpath, eval_xpath_list, extract_text, extr
# Engine metadata
about = {
"website": 'https://yandex.com/',
"wikidata_id": 'Q5281',
"official_api_documentation": "?",
"use_official_api": False,
"require_api_key": False,
"results": 'HTML',
}
# Engine configuration
categories = []
paging = True
search_type = ""
# Search URL
base_url_web = 'https://yandex.com/search/site/'
base_url_images = 'https://yandex.com/images/search'
results_xpath = '//li[contains(@class, "serp-item")]'
url_xpath = './/a[@class="b-serp-item__title-link"]/@href'
title_xpath = './/h3[@class="b-serp-item__title"]/a[@class="b-serp-item__title-link"]/span'
content_xpath = './/div[@class="b-serp-item__content"]//div[@class="b-serp-item__text"]'
def catch_bad_response(resp):
if resp.url.path.startswith('/showcaptcha'):
raise SearxEngineCaptchaException()
def request(query, params):
query_params_web = {
"tmpl_version": "releases",
"text": query,
"web": "1",
"frame": "1",
"searchid": "3131712",
}
query_params_images = {
"text": query,
"uinfo": "sw-1920-sh-1080-ww-1125-wh-999",
}
if params['pageno'] > 1:
query_params_web.update({"p": params["pageno"] - 1})
query_params_images.update({"p": params["pageno"] - 1})
params["cookies"] = {'cookie': "yp=1716337604.sp.family%3A0#1685406411.szm.1:1920x1080:1920x999"}
if search_type == 'web':
params['url'] = f"{base_url_web}?{urlencode(query_params_web)}"
elif search_type == 'images':
params['url'] = f"{base_url_images}?{urlencode(query_params_images)}"
return params
def response(resp):
if search_type == 'web':
catch_bad_response(resp)
dom = html.fromstring(resp.text)
results = []
for result in eval_xpath_list(dom, results_xpath):
results.append(
{
'url': extract_text(eval_xpath(result, url_xpath)),
'title': extract_text(eval_xpath(result, title_xpath)),
'content': extract_text(eval_xpath(result, content_xpath)),
}
)
return results
if search_type == 'images':
catch_bad_response(resp)
html_data = html.fromstring(resp.text)
html_sample = unescape(html.tostring(html_data, encoding='unicode'))
content_between_tags = extr(
html_sample, '{"location":"/images/search/', 'advRsyaSearchColumn":null}}', default="fail"
)
json_data = '{"location":"/images/search/' + content_between_tags + 'advRsyaSearchColumn":null}}'
if content_between_tags == "fail":
content_between_tags = extr(html_sample, '{"location":"/images/search/', 'false}}}')
json_data = '{"location":"/images/search/' + content_between_tags + 'false}}}'
json_resp = loads(json_data)
results = []
for _, item_data in json_resp['initialState']['serpList']['items']['entities'].items():
title = item_data['snippet']['title']
source = item_data['snippet']['url']
thumb = item_data['image']
fullsize_image = item_data['viewerData']['dups'][0]['url']
height = item_data['viewerData']['dups'][0]['h']
width = item_data['viewerData']['dups'][0]['w']
filesize = item_data['viewerData']['dups'][0]['fileSizeInBytes']
humanized_filesize = humanize_bytes(filesize)
results.append(
{
'title': title,
'url': source,
'img_src': fullsize_image,
'filesize': humanized_filesize,
'thumbnail_src': thumb,
'template': 'images.html',
'resolution': f'{width} x {height}',
}
)
return results
return []

View file

@ -67,6 +67,8 @@ def response(resp):
for result in resp.json()[1]['results']:
if search_type == "web":
if result['type'] != 'Organic':
continue
results.append(_web_result(result))
elif search_type == "images":
results.append(_images_result(result))

View file

@ -43,6 +43,7 @@ from flask_babel import gettext
from searx.utils import extract_text, eval_xpath, eval_xpath_list
from searx.enginelib.traits import EngineTraits
from searx.data import ENGINE_TRAITS
from searx.exceptions import SearxException
if TYPE_CHECKING:
import httpx
@ -108,13 +109,21 @@ def request(query: str, params: Dict[str, Any]) -> Dict[str, Any]:
zlib_year_to=zlib_year_to,
zlib_ext=zlib_ext,
)
params["verify"] = False
return params
def domain_is_seized(dom):
return bool(dom.xpath('//title') and "seized" in dom.xpath('//title')[0].text.lower())
def response(resp: httpx.Response) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
dom = html.fromstring(resp.text)
if domain_is_seized(dom):
raise SearxException(f"zlibrary domain is seized: {base_url}")
for item in dom.xpath('//div[@id="searchResultBox"]//div[contains(@class, "resItemBox")]'):
results.append(_parse_result(item))
@ -168,22 +177,30 @@ def _parse_result(item) -> Dict[str, Any]:
def fetch_traits(engine_traits: EngineTraits) -> None:
"""Fetch languages and other search arguments from zlibrary's search form."""
# pylint: disable=import-outside-toplevel
# pylint: disable=import-outside-toplevel, too-many-branches
import babel
from searx.network import get # see https://github.com/searxng/searxng/issues/762
from searx.locales import language_tag
resp = get(base_url, verify=False)
if not resp.ok: # type: ignore
raise RuntimeError("Response from zlibrary's search page is not OK.")
dom = html.fromstring(resp.text) # type: ignore
if domain_is_seized(dom):
print(f"ERROR: zlibrary domain is seized: {base_url}")
# don't change anything, re-use the existing values
engine_traits.all_locale = ENGINE_TRAITS["z-library"]["all_locale"]
engine_traits.custom = ENGINE_TRAITS["z-library"]["custom"]
engine_traits.languages = ENGINE_TRAITS["z-library"]["languages"]
return
engine_traits.all_locale = ""
engine_traits.custom["ext"] = []
engine_traits.custom["year_from"] = []
engine_traits.custom["year_to"] = []
resp = get(base_url)
if not resp.ok: # type: ignore
raise RuntimeError("Response from zlibrary's search page is not OK.")
dom = html.fromstring(resp.text) # type: ignore
for year in eval_xpath_list(dom, "//div[@id='advSearch-noJS']//select[@id='sf_yearFrom']/option"):
engine_traits.custom["year_from"].append(year.get("value"))