This commit is contained in:
Joseph Cheung 2023-02-24 08:54:06 +08:00
parent 4e51be0fda
commit 203079aecf
8 changed files with 4525 additions and 967 deletions

View file

@ -5,20 +5,20 @@
"""
# pylint: disable=use-dict-literal
from json import loads
import json
from urllib.parse import urlencode
from lxml import etree
import lxml
from httpx import HTTPError
from searx import settings
from searx.data import ENGINES_LANGUAGES
from searx.engines import (
engines,
google,
)
from searx.network import get as http_get
from searx.exceptions import SearxEngineResponseException
# a fetch_supported_languages() for XPath engines isn't available right now
# _brave = ENGINES_LANGUAGES['brave'].keys()
def get(*args, **kwargs):
if 'timeout' not in kwargs:
@ -55,34 +55,58 @@ def dbpedia(query, _lang):
results = []
if response.ok:
dom = etree.fromstring(response.content)
dom = lxml.etree.fromstring(response.content)
results = dom.xpath('//Result/Label//text()')
return results
def duckduckgo(query, _lang):
# duckduckgo autocompleter
url = 'https://ac.duckduckgo.com/ac/?{0}&type=list'
def duckduckgo(query, sxng_locale):
"""Autocomplete from DuckDuckGo. Supports DuckDuckGo's languages"""
resp = loads(get(url.format(urlencode(dict(q=query)))).text)
if len(resp) > 1:
return resp[1]
return []
traits = engines['duckduckgo'].traits
args = {
'q': query,
'kl': traits.get_region(sxng_locale, traits.all_locale),
}
url = 'https://duckduckgo.com/ac/?type=list&' + urlencode(args)
resp = get(url)
ret_val = []
if resp.ok:
j = resp.json()
if len(j) > 1:
ret_val = j[1]
return ret_val
def google(query, lang):
# google autocompleter
autocomplete_url = 'https://suggestqueries.google.com/complete/search?client=toolbar&'
def google_complete(query, sxng_locale):
"""Autocomplete from Google. Supports Google's languages and subdomains
(:py:obj:`searx.engines.google.get_google_info`) by using the async REST
API::
response = get(autocomplete_url + urlencode(dict(hl=lang, q=query)))
https://{subdomain}/complete/search?{args}
"""
google_info = google.get_google_info({'searxng_locale': sxng_locale}, engines['google'].traits)
url = 'https://{subdomain}/complete/search?{args}'
args = urlencode(
{
'q': query,
'client': 'gws-wiz',
'hl': google_info['params']['hl'],
}
)
results = []
if response.ok:
dom = etree.fromstring(response.text)
results = dom.xpath('//suggestion/@data')
resp = get(url.format(subdomain=google_info['subdomain'], args=args))
if resp.ok:
json_txt = resp.text[resp.text.find('[') : resp.text.find(']', -3) + 1]
data = json.loads(json_txt)
for item in data[0]:
results.append(lxml.html.fromstring(item[0]).text_content())
return results
@ -109,9 +133,9 @@ def seznam(query, _lang):
]
def startpage(query, lang):
# startpage autocompleter
lui = ENGINES_LANGUAGES['startpage'].get(lang, 'english')
def startpage(query, sxng_locale):
"""Autocomplete from Startpage. Supports Startpage's languages"""
lui = engines['startpage'].traits.get_language(sxng_locale, 'english')
url = 'https://startpage.com/suggestions?{query}'
resp = get(url.format(query=urlencode({'q': query, 'segment': 'startpage.udog', 'lui': lui})))
data = resp.json()
@ -122,20 +146,20 @@ def swisscows(query, _lang):
# swisscows autocompleter
url = 'https://swisscows.ch/api/suggest?{query}&itemsCount=5'
resp = loads(get(url.format(query=urlencode({'query': query}))).text)
resp = json.loads(get(url.format(query=urlencode({'query': query}))).text)
return resp
def qwant(query, lang):
# qwant autocompleter (additional parameter : lang=en_en&count=xxx )
url = 'https://api.qwant.com/api/suggest?{query}'
resp = get(url.format(query=urlencode({'q': query, 'lang': lang})))
def qwant(query, sxng_locale):
"""Autocomplete from Qwant. Supports Qwant's regions."""
results = []
locale = engines['startpage'].traits.get_region(sxng_locale, 'en_US')
url = 'https://api.qwant.com/v3/suggest?{query}'
resp = get(url.format(query=urlencode({'q': query, 'locale': locale, 'version': '2'})))
if resp.ok:
data = loads(resp.text)
data = resp.json()
if data['status'] == 'success':
for item in data['data']['items']:
results.append(item['value'])
@ -143,21 +167,38 @@ def qwant(query, lang):
return results
def wikipedia(query, lang):
# wikipedia autocompleter
url = 'https://' + lang + '.wikipedia.org/w/api.php?action=opensearch&{0}&limit=10&namespace=0&format=json'
def wikipedia(query, sxng_locale):
"""Autocomplete from Wikipedia. Supports Wikipedia's languages (aka netloc)."""
results = []
eng_traits = engines['wikipedia'].traits
wiki_lang = eng_traits.get_language(sxng_locale, 'en')
wiki_netloc = eng_traits.custom['wiki_netloc'].get(wiki_lang, 'en.wikipedia.org')
resp = loads(get(url.format(urlencode(dict(search=query)))).text)
if len(resp) > 1:
return resp[1]
return []
url = 'https://{wiki_netloc}/w/api.php?{args}'
args = urlencode(
{
'action': 'opensearch',
'format': 'json',
'formatversion': '2',
'search': query,
'namespace': '0',
'limit': '10',
}
)
resp = get(url.format(args=args, wiki_netloc=wiki_netloc))
if resp.ok:
data = resp.json()
if len(data) > 1:
results = data[1]
return results
def yandex(query, _lang):
# yandex autocompleter
url = "https://suggest.yandex.com/suggest-ff.cgi?{0}"
resp = loads(get(url.format(urlencode(dict(part=query)))).text)
resp = json.loads(get(url.format(urlencode(dict(part=query)))).text)
if len(resp) > 1:
return resp[1]
return []
@ -166,7 +207,7 @@ def yandex(query, _lang):
backends = {
'dbpedia': dbpedia,
'duckduckgo': duckduckgo,
'google': google,
'google': google_complete,
'seznam': seznam,
'startpage': startpage,
'swisscows': swisscows,
@ -177,12 +218,11 @@ backends = {
}
def search_autocomplete(backend_name, query, lang):
def search_autocomplete(backend_name, query, sxng_locale):
backend = backends.get(backend_name)
if backend is None:
return []
try:
return backend(query, lang)
return backend(query, sxng_locale)
except (HTTPError, SearxEngineResponseException):
return []

File diff suppressed because it is too large Load diff

View file

@ -1,34 +1,39 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""This is the implementation of the google WEB engine. Some of this
implementations are shared by other engines:
"""This is the implementation of the Google WEB engine. Some of this
implementations (manly the :py:obj:`get_google_info`) are shared by other
engines:
- :ref:`google images engine`
- :ref:`google news engine`
- :ref:`google videos engine`
The google WEB engine itself has a special setup option:
.. code:: yaml
- name: google
...
use_mobile_ui: false
``use_mobile_ui``: (default: ``false``)
Enables to use *mobile endpoint* to bypass the google blocking (see
:issue:`159`). On the mobile UI of Google Search, the button :guilabel:`More
results` is not affected by Google rate limiting and we can still do requests
while actively blocked by the original Google search. By activate
``use_mobile_ui`` this behavior is simulated by adding the parameter
``async=use_ac:true,_fmt:pc`` to the :py:func:`request`.
- :ref:`google scholar engine`
- :ref:`google autocomplete`
"""
from typing import TYPE_CHECKING
import re
from urllib.parse import urlencode
from lxml import html
from searx.utils import match_language, extract_text, eval_xpath, eval_xpath_list, eval_xpath_getindex
import babel
import babel.core
import babel.languages
from searx.utils import extract_text, eval_xpath, eval_xpath_list, eval_xpath_getindex
from searx.locales import language_tag, region_tag, get_offical_locales
from searx import network
from searx.exceptions import SearxEngineCaptchaException
from searx.enginelib.traits import EngineTraits
if TYPE_CHECKING:
import logging
logger: logging.Logger
traits: EngineTraits
# about
about = {
@ -45,64 +50,6 @@ categories = ['general', 'web']
paging = True
time_range_support = True
safesearch = True
send_accept_language_header = True
use_mobile_ui = False
supported_languages_url = 'https://www.google.com/preferences?#languages'
# based on https://en.wikipedia.org/wiki/List_of_Google_domains and tests
google_domains = {
'BG': 'google.bg', # Bulgaria
'CZ': 'google.cz', # Czech Republic
'DE': 'google.de', # Germany
'DK': 'google.dk', # Denmark
'AT': 'google.at', # Austria
'CH': 'google.ch', # Switzerland
'GR': 'google.gr', # Greece
'AU': 'google.com.au', # Australia
'CA': 'google.ca', # Canada
'GB': 'google.co.uk', # United Kingdom
'ID': 'google.co.id', # Indonesia
'IE': 'google.ie', # Ireland
'IN': 'google.co.in', # India
'MY': 'google.com.my', # Malaysia
'NZ': 'google.co.nz', # New Zealand
'PH': 'google.com.ph', # Philippines
'SG': 'google.com.sg', # Singapore
'US': 'google.com', # United States (google.us) redirects to .com
'ZA': 'google.co.za', # South Africa
'AR': 'google.com.ar', # Argentina
'CL': 'google.cl', # Chile
'ES': 'google.es', # Spain
'MX': 'google.com.mx', # Mexico
'EE': 'google.ee', # Estonia
'FI': 'google.fi', # Finland
'BE': 'google.be', # Belgium
'FR': 'google.fr', # France
'IL': 'google.co.il', # Israel
'HR': 'google.hr', # Croatia
'HU': 'google.hu', # Hungary
'IT': 'google.it', # Italy
'JP': 'google.co.jp', # Japan
'KR': 'google.co.kr', # South Korea
'LT': 'google.lt', # Lithuania
'LV': 'google.lv', # Latvia
'NO': 'google.no', # Norway
'NL': 'google.nl', # Netherlands
'PL': 'google.pl', # Poland
'BR': 'google.com.br', # Brazil
'PT': 'google.pt', # Portugal
'RO': 'google.ro', # Romania
'RU': 'google.ru', # Russia
'SK': 'google.sk', # Slovakia
'SI': 'google.si', # Slovenia
'SE': 'google.se', # Sweden
'TH': 'google.co.th', # Thailand
'TR': 'google.com.tr', # Turkey
'UA': 'google.com.ua', # Ukraine
'CN': 'google.com.hk', # There is no google.cn, we use .com.hk for zh-CN
'HK': 'google.com.hk', # Hong Kong
'TW': 'google.com.tw', # Taiwan
}
time_range_dict = {'day': 'd', 'week': 'w', 'month': 'm', 'year': 'y'}
@ -115,47 +62,50 @@ filter_mapping = {0: 'off', 1: 'medium', 2: 'high'}
results_xpath = './/div[@data-sokoban-container]'
title_xpath = './/a/h3[1]'
href_xpath = './/a[h3]/@href'
content_xpath = './/div[@data-content-feature=1]'
content_xpath = './/div[@data-content-feature]'
# google *sections* are no usual *results*, we ignore them
g_section_with_header = './g-section-with-header'
# Suggestions are links placed in a *card-section*, we extract only the text
# from the links not the links itself.
suggestion_xpath = '//div[contains(@class, "EIaa9b")]//a'
# UI_ASYNC = 'use_ac:true,_fmt:html' # returns a HTTP 500 when user search for
# # celebrities like '!google natasha allegri'
# # or '!google chris evans'
UI_ASYNC = 'use_ac:true,_fmt:prog'
"""Format of the response from UI's async request."""
def get_lang_info(params, lang_list, custom_aliases, supported_any_language):
"""Composing various language properties for the google engines.
def get_google_info(params, eng_traits):
"""Composing various (language) properties for the google engines (:ref:`google
API`).
This function is called by the various google engines (:ref:`google web
engine`, :ref:`google images engine`, :ref:`google news engine` and
:ref:`google videos engine`).
:param dict param: request parameters of the engine
:param dict param: Request parameters of the engine. At least
a ``searxng_locale`` key should be in the dictionary.
:param list lang_list: list of supported languages of the engine
:py:obj:`ENGINES_LANGUAGES[engine-name] <searx.data.ENGINES_LANGUAGES>`
:param dict lang_list: custom aliases for non standard language codes
(used when calling :py:func:`searx.utils.match_language`)
:param bool supported_any_language: When a language is not specified, the
language interpretation is left up to Google to decide how the search
results should be delivered. This argument is ``True`` for the google
engine and ``False`` for the other engines (google-images, -news,
-scholar, -videos).
:param eng_traits: Engine's traits fetched from google preferences
(:py:obj:`searx.enginelib.traits.EngineTraits`)
:rtype: dict
:returns:
Py-Dictionary with the key/value pairs:
language:
Return value from :py:func:`searx.utils.match_language`
The language code that is used by google (e.g. ``lang_en`` or
``lang_zh-TW``)
country:
The country code (e.g. US, AT, CA, FR, DE ..)
The country code that is used by google (e.g. ``US`` or ``TW``)
locale:
A instance of :py:obj:`babel.core.Locale` build from the
``searxng_locale`` value.
subdomain:
Google subdomain :py:obj:`google_domains` that fits to the country
@ -165,52 +115,67 @@ def get_lang_info(params, lang_list, custom_aliases, supported_any_language):
Py-Dictionary with additional request arguments (can be passed to
:py:func:`urllib.parse.urlencode`).
- ``hl`` parameter: specifies the interface language of user interface.
- ``lr`` parameter: restricts search results to documents written in
a particular language.
- ``cr`` parameter: restricts search results to documents
originating in a particular country.
- ``ie`` parameter: sets the character encoding scheme that should
be used to interpret the query string ('utf8').
- ``oe`` parameter: sets the character encoding scheme that should
be used to decode the XML result ('utf8').
headers:
Py-Dictionary with additional HTTP headers (can be passed to
request's headers)
- ``Accept: '*/*``
"""
ret_val = {
'language': None,
'country': None,
'subdomain': None,
'params': {},
'headers': {},
'cookies': {},
'locale': None,
}
# language ...
sxng_locale = params.get('searxng_locale', 'all')
try:
locale = babel.Locale.parse(sxng_locale, sep='-')
except babel.core.UnknownLocaleError:
locale = None
_lang = params['language']
_any_language = _lang.lower() == 'all'
if _any_language:
_lang = 'en-US'
language = match_language(_lang, lang_list, custom_aliases)
ret_val['language'] = language
eng_lang = eng_traits.get_language(sxng_locale, 'lang_en')
lang_code = eng_lang.split('_')[-1] # lang_zh-TW --> zh-TW / lang_en --> en
country = eng_traits.get_region(sxng_locale, eng_traits.all_locale)
# country ...
# Test zh_hans & zh_hant --> in the topmost links in the result list of list
# TW and HK you should a find wiktionary.org zh_hant link. In the result
# list of zh-CN should not be no hant link instead you should find
# zh.m.wikipedia.org/zh somewhere in the top.
_l = _lang.split('-')
if len(_l) == 2:
country = _l[1]
else:
country = _l[0].upper()
if country == 'EN':
country = 'US'
# '!go 日 :zh-TW' --> https://zh.m.wiktionary.org/zh-hant/%E6%97%A5
# '!go 日 :zh-CN' --> https://zh.m.wikipedia.org/zh/%E6%97%A5
ret_val['language'] = eng_lang
ret_val['country'] = country
# subdomain ...
ret_val['subdomain'] = 'www.' + google_domains.get(country.upper(), 'google.com')
# params & headers
lang_country = '%s-%s' % (language, country) # (en-US, en-EN, de-DE, de-AU, fr-FR ..)
ret_val['locale'] = locale
ret_val['subdomain'] = eng_traits.custom['supported_domains'].get(country.upper(), 'www.google.com')
# hl parameter:
# https://developers.google.com/custom-search/docs/xml_results#hlsp The
# Interface Language:
# The hl parameter specifies the interface language (host language) of
# your user interface. To improve the performance and the quality of your
# search results, you are strongly encouraged to set this parameter
# explicitly.
# https://developers.google.com/custom-search/docs/xml_results#hlsp
# The Interface Language:
# https://developers.google.com/custom-search/docs/xml_results_appendices#interfaceLanguages
ret_val['params']['hl'] = lang_list.get(lang_country, language)
ret_val['params']['hl'] = lang_code
# lr parameter:
# The lr (language restrict) parameter restricts search results to
@ -218,22 +183,72 @@ def get_lang_info(params, lang_list, custom_aliases, supported_any_language):
# https://developers.google.com/custom-search/docs/xml_results#lrsp
# Language Collection Values:
# https://developers.google.com/custom-search/docs/xml_results_appendices#languageCollections
#
# To select 'all' languages an empty 'lr' value is used.
#
# Different to other google services, Google Schloar supports to select more
# than one language. The languages are seperated by a pipe '|' (logical OR).
# By example: &lr=lang_zh-TW%7Clang_de selects articles written in
# traditional chinese OR german language.
if _any_language and supported_any_language:
ret_val['params']['lr'] = eng_lang
if sxng_locale == 'all':
ret_val['params']['lr'] = ''
# interpretation is left up to Google (based on whoogle)
#
# - add parameter ``source=lnt``
# - don't use parameter ``lr``
# - don't add a ``Accept-Language`` HTTP header.
# cr parameter:
# The cr parameter restricts search results to documents originating in a
# particular country.
# https://developers.google.com/custom-search/docs/xml_results#crsp
ret_val['params']['source'] = 'lnt'
ret_val['params']['cr'] = 'country' + country
if sxng_locale == 'all':
ret_val['params']['cr'] = ''
else:
# gl parameter: (mandatory by Geeogle News)
# The gl parameter value is a two-letter country code. For WebSearch
# results, the gl parameter boosts search results whose country of origin
# matches the parameter value. See the Country Codes section for a list of
# valid values.
# Specifying a gl parameter value in WebSearch requests should improve the
# relevance of results. This is particularly true for international
# customers and, even more specifically, for customers in English-speaking
# countries other than the United States.
# https://developers.google.com/custom-search/docs/xml_results#glsp
# restricts search results to documents written in a particular
# language.
ret_val['params']['lr'] = "lang_" + lang_list.get(lang_country, language)
ret_val['params']['gl'] = country
# ie parameter:
# The ie parameter sets the character encoding scheme that should be used
# to interpret the query string. The default ie value is latin1.
# https://developers.google.com/custom-search/docs/xml_results#iesp
ret_val['params']['ie'] = 'utf8'
# oe parameter:
# The oe parameter sets the character encoding scheme that should be used
# to decode the XML result. The default oe value is latin1.
# https://developers.google.com/custom-search/docs/xml_results#oesp
ret_val['params']['oe'] = 'utf8'
# num parameter:
# The num parameter identifies the number of search results to return.
# The default num value is 10, and the maximum value is 20. If you request
# more than 20 results, only 20 results will be returned.
# https://developers.google.com/custom-search/docs/xml_results#numsp
# HINT: seems to have no effect (tested in google WEB & Images)
# ret_val['params']['num'] = 20
# HTTP headers
ret_val['headers']['Accept'] = '*/*'
# Cookies
# - https://github.com/searxng/searxng/pull/1679#issuecomment-1235432746
# - https://github.com/searxng/searxng/issues/1555
ret_val['cookies']['CONSENT'] = "YES+"
return ret_val
@ -245,33 +260,34 @@ def detect_google_sorry(resp):
def request(query, params):
"""Google search request"""
# pylint: disable=line-too-long
offset = (params['pageno'] - 1) * 10
lang_info = get_lang_info(params, supported_languages, language_aliases, True)
additional_parameters = {}
if use_mobile_ui:
additional_parameters = {
'asearch': 'arc',
'async': 'use_ac:true,_fmt:prog',
}
google_info = get_google_info(params, traits)
# https://www.google.de/search?q=corona&hl=de&lr=lang_de&start=0&tbs=qdr%3Ad&safe=medium
query_url = (
'https://'
+ lang_info['subdomain']
+ google_info['subdomain']
+ '/search'
+ "?"
+ urlencode(
{
'q': query,
**lang_info['params'],
'ie': "utf8",
'oe': "utf8",
'start': offset,
**google_info['params'],
'filter': '0',
**additional_parameters,
'start': offset,
# 'vet': '12ahUKEwik3ZbIzfn7AhXMX_EDHbUDBh0QxK8CegQIARAC..i',
# 'ved': '2ahUKEwik3ZbIzfn7AhXMX_EDHbUDBh0Q_skCegQIARAG',
# 'cs' : 1,
# 'sa': 'N',
# 'yv': 3,
# 'prmd': 'vin',
# 'ei': 'GASaY6TxOcy_xc8PtYeY6AE',
# 'sa': 'N',
# 'sstk': 'AcOHfVkD7sWCSAheZi-0tx_09XDO55gTWY0JNq3_V26cNN-c8lfD45aZYPI8s_Bqp8s57AHz5pxchDtAGCA_cikAWSjy9kw3kgg'
# formally known as use_mobile_ui
'asearch': 'arc',
'async': UI_ASYNC,
}
)
)
@ -282,25 +298,45 @@ def request(query, params):
query_url += '&' + urlencode({'safe': filter_mapping[params['safesearch']]})
params['url'] = query_url
params['cookies']['CONSENT'] = "YES+"
params['headers'].update(lang_info['headers'])
if use_mobile_ui:
params['headers']['Accept'] = '*/*'
else:
params['headers']['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
params['cookies'] = google_info['cookies']
params['headers'].update(google_info['headers'])
return params
# (function(){var s='data:image/jpeg;base64,/9j/4AAQ ...
# ... DX/Ff5XSpSgdU32xSlKDJ//9k\x3d';var ii=['dimg_21'];_setImagesSrc(ii,s);})();
RE_DATA_IMAGE = re.compile(r"'(data:image[^']*)'[^']*ii=\['([^']*)'\];_setImagesSrc")
def _parse_data_images(dom):
data_image_map = {}
for _script in eval_xpath_list(dom, "//script[@nonce]"):
script = _script.text
if not script:
continue
script = RE_DATA_IMAGE.search(script)
if not script:
continue
data_image_map[script.group(2)] = script.group(1).replace(r'\x3d', '=')
logger.debug('data:image objects --> %s', list(data_image_map.keys()))
return data_image_map
def response(resp):
"""Get response from google's search request"""
# pylint: disable=too-many-branches, too-many-statements
detect_google_sorry(resp)
results = []
# convert the text to dom
dom = html.fromstring(resp.text)
data_image_map = []
if '_fmt:html' in UI_ASYNC:
# in this format images are embedded by a bse64 encoded 'data:image'
data_image_map = _parse_data_images(dom)
# results --> answer
answer_list = eval_xpath(dom, '//div[contains(@class, "LGOjhe")]')
if answer_list:
@ -309,20 +345,9 @@ def response(resp):
else:
logger.debug("did not find 'answer'")
# results --> number_of_results
if not use_mobile_ui:
try:
_txt = eval_xpath_getindex(dom, '//div[@id="result-stats"]//text()', 0)
_digit = ''.join([n for n in _txt if n.isdigit()])
number_of_results = int(_digit)
results.append({'number_of_results': number_of_results})
except Exception as e: # pylint: disable=broad-except
logger.debug("did not 'number_of_results'")
logger.error(e, exc_info=True)
# parse results
for result in eval_xpath_list(dom, results_xpath):
for result in eval_xpath_list(dom, results_xpath): # pylint: disable=too-many-nested-blocks
# google *sections*
if extract_text(eval_xpath(result, g_section_with_header)):
@ -339,13 +364,31 @@ def response(resp):
url = eval_xpath_getindex(result, href_xpath, 0, None)
if url is None:
continue
content = extract_text(eval_xpath_getindex(result, content_xpath, 0, default=None), allow_none=True)
if content is None:
content = []
img_list = []
for content_feature in eval_xpath(result, content_xpath):
val = content_feature.attrib['data-content-feature']
if val in ['1', '2']:
txt = extract_text(content_feature, allow_none=True)
if txt:
content.append(txt)
elif '0' in val:
img = content_feature.xpath('.//img/@src')
if img:
img = img[0]
if img.startswith('data:image'):
img_id = content_feature.xpath('.//img/@id')
if img_id:
img = data_image_map.get(img_id[0])
img_list.append(img)
if not content:
logger.debug('ignoring item from the result_xpath list: missing content of title "%s"', title)
continue
logger.debug('add link to results: %s', title)
results.append({'url': url, 'title': title, 'content': content})
content = ' / '.join(content)
img_src = img_list[0] if img_list else None
results.append({'url': url, 'title': title, 'content': content, 'img_src': img_src})
except Exception as e: # pylint: disable=broad-except
logger.error(e, exc_info=True)
@ -361,15 +404,107 @@ def response(resp):
# get supported languages from their site
def _fetch_supported_languages(resp):
ret_val = {}
skip_countries = [
# official language of google-country not in google-languages
'AL', # Albanien (sq)
'AZ', # Aserbaidschan (az)
'BD', # Bangladesch (bn)
'BN', # Brunei Darussalam (ms)
'BT', # Bhutan (dz)
'ET', # Äthiopien (am)
'GE', # Georgien (ka, os)
'GL', # Grönland (kl)
'KH', # Kambodscha (km)
'LA', # Laos (lo)
'LK', # Sri Lanka (si, ta)
'ME', # Montenegro (sr)
'MK', # Nordmazedonien (mk, sq)
'MM', # Myanmar (my)
'MN', # Mongolei (mn)
'MV', # Malediven (dv) // dv_MV is unknown by babel
'MY', # Malaysia (ms)
'NP', # Nepal (ne)
'TJ', # Tadschikistan (tg)
'TM', # Turkmenistan (tk)
'UZ', # Usbekistan (uz)
]
def fetch_traits(engine_traits: EngineTraits, add_domains: bool = True):
"""Fetch languages from Google."""
# pylint: disable=import-outside-toplevel, too-many-branches
engine_traits.custom['supported_domains'] = {}
resp = network.get('https://www.google.com/preferences')
if not resp.ok:
raise RuntimeError("Response from Google's preferences is not OK.")
dom = html.fromstring(resp.text)
radio_buttons = eval_xpath_list(dom, '//*[@id="langSec"]//input[@name="lr"]')
# supported language codes
for x in radio_buttons:
name = x.get("data-name")
code = x.get("value").split('_')[-1]
ret_val[code] = {"name": name}
lang_map = {'no': 'nb'}
for x in eval_xpath_list(dom, '//*[@id="langSec"]//input[@name="lr"]'):
return ret_val
eng_lang = x.get("value").split('_')[-1]
try:
locale = babel.Locale.parse(lang_map.get(eng_lang, eng_lang), sep='-')
except babel.UnknownLocaleError:
print("ERROR: %s -> %s is unknown by babel" % (x.get("data-name"), eng_lang))
continue
sxng_lang = language_tag(locale)
conflict = engine_traits.languages.get(sxng_lang)
if conflict:
if conflict != eng_lang:
print("CONFLICT: babel %s --> %s, %s" % (sxng_lang, conflict, eng_lang))
continue
engine_traits.languages[sxng_lang] = 'lang_' + eng_lang
# alias languages
engine_traits.languages['zh'] = 'lang_zh-CN'
# supported region codes
for x in eval_xpath_list(dom, '//*[@name="region"]/..//input[@name="region"]'):
eng_country = x.get("value")
if eng_country in skip_countries:
continue
if eng_country == 'ZZ':
engine_traits.all_locale = 'ZZ'
continue
sxng_locales = get_offical_locales(eng_country, engine_traits.languages.keys(), regional=True)
if not sxng_locales:
print("ERROR: can't map from google country %s (%s) to a babel region." % (x.get('data-name'), eng_country))
continue
for sxng_locale in sxng_locales:
engine_traits.regions[region_tag(sxng_locale)] = eng_country
# alias regions
engine_traits.regions['zh-CN'] = 'HK'
# supported domains
if add_domains:
resp = network.get('https://www.google.com/supported_domains')
if not resp.ok:
raise RuntimeError("Response from https://www.google.com/supported_domains is not OK.")
for domain in resp.text.split():
domain = domain.strip()
if not domain or domain in [
'.google.com',
]:
continue
region = domain.split('.')[-1].upper()
engine_traits.custom['supported_domains'][region] = 'www' + domain
if region == 'HK':
# There is no google.cn, we use .com.hk for zh-CN
engine_traits.custom['supported_domains']['CN'] = 'www' + domain

View file

@ -1,31 +1,38 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""This is the implementation of the google images engine using the google
internal API used the Google Go Android app.
"""This is the implementation of the Google Images engine using the internal
Google API used by the Google Go Android app.
This internal API offer results in
- JSON (_fmt:json)
- Protobuf (_fmt:pb)
- Protobuf compressed? (_fmt:pc)
- HTML (_fmt:html)
- Protobuf encoded in JSON (_fmt:jspb).
- JSON (``_fmt:json``)
- Protobuf_ (``_fmt:pb``)
- Protobuf_ compressed? (``_fmt:pc``)
- HTML (``_fmt:html``)
- Protobuf_ encoded in JSON (``_fmt:jspb``).
.. _Protobuf: https://en.wikipedia.org/wiki/Protocol_Buffers
"""
from typing import TYPE_CHECKING
from urllib.parse import urlencode
from json import loads
from searx.engines.google import fetch_traits # pylint: disable=unused-import
from searx.engines.google import (
get_lang_info,
get_google_info,
time_range_dict,
detect_google_sorry,
)
# pylint: disable=unused-import
from searx.engines.google import supported_languages_url, _fetch_supported_languages
if TYPE_CHECKING:
import logging
from searx.enginelib.traits import EngineTraits
logger: logging.Logger
traits: EngineTraits
# pylint: enable=unused-import
# about
about = {
@ -40,7 +47,6 @@ about = {
# engine dependent config
categories = ['images', 'web']
paging = True
use_locale_domain = True
time_range_support = True
safesearch = True
send_accept_language_header = True
@ -51,20 +57,18 @@ filter_mapping = {0: 'images', 1: 'active', 2: 'active'}
def request(query, params):
"""Google-Image search request"""
lang_info = get_lang_info(params, supported_languages, language_aliases, False)
google_info = get_google_info(params, traits)
query_url = (
'https://'
+ lang_info['subdomain']
+ google_info['subdomain']
+ '/search'
+ "?"
+ urlencode(
{
'q': query,
'tbm': "isch",
**lang_info['params'],
'ie': "utf8",
'oe': "utf8",
**google_info['params'],
'asearch': 'isch',
'async': '_fmt:json,p:1,ijn:' + str(params['pageno']),
}
@ -77,9 +81,8 @@ def request(query, params):
query_url += '&' + urlencode({'safe': filter_mapping[params['safesearch']]})
params['url'] = query_url
params['headers'].update(lang_info['headers'])
params['headers']['User-Agent'] = 'NSTN/3.60.474802233.release Dalvik/2.1.0 (Linux; U; Android 12; US) gzip'
params['headers']['Accept'] = '*/*'
params['cookies'] = google_info['cookies']
params['headers'].update(google_info['headers'])
return params
@ -111,7 +114,11 @@ def response(resp):
copyright_notice = item["result"].get('iptc', {}).get('copyright_notice')
if copyright_notice:
result_item['source'] += ' / ' + copyright_notice
result_item['source'] += ' | ' + copyright_notice
freshness_date = item["result"].get("freshness_date")
if freshness_date:
result_item['source'] += ' | ' + freshness_date
file_size = item.get('gsa', {}).get('file_size')
if file_size:

View file

@ -1,529 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""This is the implementation of the google WEB engine using the google internal API used on the mobile UI.
This internal API offer results in
- JSON (_fmt:json)
- Protobuf (_fmt:pb)
- Protobuf compressed? (_fmt:pc)
- HTML (_fmt:html)
- Protobuf encoded in JSON (_fmt:jspb).
Some of this implementations are shared by other engines:
The implementation is shared by other engines:
- :ref:`google images internal engine`
- :ref:`google news internal engine`
- :ref:`google videos internal engine`
"""
from urllib.parse import urlencode
from json import loads, dumps
from datetime import datetime, timedelta
from dateutil.tz import tzoffset
from babel.dates import format_datetime
import babel
from searx.utils import html_to_text
# pylint: disable=unused-import
from searx.engines.google import (
get_lang_info,
detect_google_sorry,
supported_languages_url,
time_range_dict,
filter_mapping,
_fetch_supported_languages,
)
# pylint: enable=unused-import
# about
about = {
"website": 'https://www.google.com',
"wikidata_id": 'Q9366',
"official_api_documentation": 'https://developers.google.com/custom-search/',
"use_official_api": False,
"require_api_key": False,
"results": 'JSON',
}
# engine dependent config
categories = None
paging = True
time_range_support = True
safesearch = True
send_accept_language_header = True
# configuration
include_image_results = True
include_twitter_results = False
def get_query_url_general(query, lang_info, query_params):
return (
'https://'
+ lang_info['subdomain']
+ '/search'
+ "?"
+ urlencode(
{
'q': query,
**query_params,
}
)
)
def get_query_url_images(query, lang_info, query_params):
# https://www.google.de/search?q=corona&hl=de&lr=lang_de&start=0&tbs=qdr%3Ad&safe=medium
return (
'https://'
+ lang_info['subdomain']
+ '/search'
+ "?"
+ urlencode(
{
'q': query,
'tbm': "isch",
**query_params,
}
)
)
def get_query_url_news(query, lang_info, query_params):
return (
'https://'
+ lang_info['subdomain']
+ '/search'
+ "?"
+ urlencode(
{
'q': query,
'tbm': "nws",
**query_params,
}
)
)
CATEGORY_TO_GET_QUERY_URL = {
'general': get_query_url_general,
'images': get_query_url_images,
'news': get_query_url_news,
}
CATEGORY_RESULT_COUNT_PER_PAGE = {
'general': 10,
'images': 100,
'news': 10,
}
def request(query, params):
"""Google search request"""
result_count_per_page = CATEGORY_RESULT_COUNT_PER_PAGE[categories[0]] # pylint: disable=unsubscriptable-object
offset = (params['pageno'] - 1) * result_count_per_page
lang_info = get_lang_info(params, supported_languages, language_aliases, True)
query_params = {
**lang_info['params'],
'ie': 'utf8',
'oe': 'utf8',
'start': offset,
'num': result_count_per_page,
'filter': '0',
'asearch': 'arc',
'async': 'use_ac:true,_fmt:json',
}
get_query_url = CATEGORY_TO_GET_QUERY_URL[categories[0]] # pylint: disable=unsubscriptable-object
# https://www.google.de/search?q=corona&hl=de&lr=lang_de&start=0&tbs=qdr%3Ad&safe=medium
query_url = get_query_url(query, lang_info, query_params)
if params['time_range'] in time_range_dict:
query_url += '&' + urlencode({'tbs': 'qdr:' + time_range_dict[params['time_range']]})
if params['safesearch']:
query_url += '&' + urlencode({'safe': filter_mapping[params['safesearch']]})
params['url'] = query_url
params['headers'].update(lang_info['headers'])
params['headers']['Accept'] = '*/*'
return params
def parse_search_feature_proto(search_feature_proto):
result_index = search_feature_proto["feature_metadata"]["logging_tree_ref_feature_metadata_extension"][
"result_index"
]
image_result_data = search_feature_proto["payload"]["image_result_data"]
title = html_to_text(image_result_data["page_title"])
content = html_to_text(image_result_data.get("snippet", ""))
url = image_result_data["coupled_url"]
img_src = image_result_data["url"]
thumbnail_src = "https://encrypted-tbn0.gstatic.com/images?q=tbn:" + image_result_data["encrypted_docid"]
img_format = f'{image_result_data["full_image_size"]["width"]} * {image_result_data["full_image_size"]["height"]}'
iptc = image_result_data.get("iptc_info", {}).get("iptc", {})
copyright_notice = iptc.get("copyright_notice")
creator = iptc.get("creator")
if isinstance(creator, list):
creator = ", ".join(creator)
if creator and copyright_notice and creator != copyright_notice:
author = f'{creator} ; {copyright_notice}'
else:
author = creator
return {
"template": "images.html",
"title": title,
"content": content,
"url": url,
"img_src": img_src,
"thumbnail_src": thumbnail_src,
'img_format': img_format,
"author": author,
"result_index": result_index,
}
class ParseResultGroupItem:
"""Parse result_group_search_feature_proto.search_feature_proto"""
def __init__(self, locale):
"""Parse one tier 1 result"""
self.locale = locale
self.item_types = {
"EXPLORE_UNIVERSAL_BLOCK": self.explore_universal_block,
"HOST_CLUSTER": self.host_cluster,
"NAVIGATIONAL_RESULT_GROUP": self.navigational_result_group,
"VIDEO_RESULT": self.video_result,
"VIDEO_UNIVERSAL_GROUP": self.video_universal_group,
"WEB_RESULT": self.web_result,
"WEB_ANSWERS_CARD_BLOCK": self.web_answers_card_block,
"IMAGE_RESULT_GROUP": self.image_result_group,
"TWITTER_RESULT_GROUP": self.twitter_result_group,
"NEWS_WHOLEPAGE": self.news_wholepage,
# WHOLEPAGE_PAGE_GROUP - found for keyword what is t in English language
# EXPLORE_UNIVERSAL_BLOCK
# TRAVEL_ANSWERS_RESULT
# TOP_STORIES : news.html template
# ONEBOX_BLOCK: for example, result of math forumla, weather ...
}
def explore_universal_block(self, item_to_parse):
results = []
for item in item_to_parse["explore_universal_unit_sfp_interface"]:
explore_unit = item["explore_block_extension"]["payload"]["explore_unit"]
if "lookup_key" in explore_unit:
results.append(
{'suggestion': html_to_text(explore_unit["lookup_key"]["aquarium_query"]), 'result_index': -1}
)
elif "label" in explore_unit:
results.append({'suggestion': html_to_text(explore_unit["label"]["text"]), 'result_index': -1})
return results
def host_cluster(self, item_to_parse):
results = []
for navigational_result in item_to_parse["results"]:
result_index = navigational_result["web_result_inner"]["feature_metadata"][
"logging_tree_ref_feature_metadata_extension"
]["result_index"]
url = None
title = None
content = None
for item in navigational_result["payload"]["sub_features"]["sub_feature"]:
payload = item["search_feature_proto"]["payload"]
if "primary_link" in payload:
primary_link = payload["primary_link"]
title = html_to_text(primary_link["title"])
url = primary_link["url"]
if "snippet_text" in payload:
content = html_to_text(payload["snippet_text"])
results.append({'url': url, 'title': title, 'content': content, 'result_index': result_index})
# to do: parse additional results
return results
def navigational_result_group(self, item_to_parse):
results = []
navigational_result = item_to_parse["navigational_result"]
result_index = navigational_result["navigational_result_inner"]["feature_metadata"][
"logging_tree_ref_feature_metadata_extension"
]["result_index"]
url = None
title = None
content = None
for item in navigational_result["payload"]["sub_features"]["sub_feature"]:
payload = item["search_feature_proto"]["payload"]
if "primary_link" in payload:
primary_link = payload["primary_link"]
title = html_to_text(primary_link["title"])
url = primary_link["url"]
if "snippet_text" in payload:
content = html_to_text(payload["snippet_text"])
results.append({'url': url, 'title': title, 'content': content, 'result_index': result_index})
for item in item_to_parse["megasitelinks"]["results"]:
result_data = item["payload"]["result_data"]
url = result_data["url"]
title = html_to_text(result_data["result_title"])
content = html_to_text(result_data["snippet"])
result_index = item["feature_metadata"]["logging_tree_ref_feature_metadata_extension"]["result_index"]
results.append({'url': url, 'title': title, 'content': content, 'result_index': result_index})
return results
def video_result(self, item_to_parse):
result_index = item_to_parse["feature_metadata"]["logging_tree_ref_feature_metadata_extension"]["result_index"]
url = None
title = None
for item in item_to_parse["payload"]["sub_features"]["sub_feature"]:
payload = item["search_feature_proto"]["payload"]
if "primary_link" in payload:
primary_link = payload["primary_link"]
title = html_to_text(primary_link["title"])
url = primary_link["url"]
return [{'url': url, 'title': title, 'result_index': result_index}]
def video_universal_group(self, item_to_parse):
results = []
for item in item_to_parse["video_universal_group_element"]:
video_result = item["video_result"]
result_index = video_result["feature_metadata"]["logging_tree_ref_feature_metadata_extension"][
"result_index"
]
video_result_data = video_result["payload"]["video_result_data"]
url = video_result_data["url"]
title = html_to_text(video_result_data["title"])
content = html_to_text(video_result_data["snippet"])
results.append({'url': url, 'title': title, 'content': content, 'result_index': result_index})
return results
def web_result(self, item_to_parse):
result_index = item_to_parse["web_result_inner"]["feature_metadata"][
"logging_tree_ref_feature_metadata_extension"
]["result_index"]
url = None
title = None
content = None
for item in item_to_parse["payload"]["sub_features"]["sub_feature"]:
payload = item["search_feature_proto"]["payload"]
if "primary_link" in payload:
primary_link = payload["primary_link"]
title = html_to_text(primary_link["title"])
url = primary_link["url"]
if "snippet_text" in payload:
content = html_to_text(payload["snippet_text"])
return [{'url': url, 'title': title, 'content': content, 'result_index': result_index}]
def web_answers_card_block(self, item_to_parse):
results = []
for item in item_to_parse["web_answers_card_block_elements"]:
answer = None
url = None
for item_webanswers in item["webanswers_container"]["webanswers_container_elements"]:
if (
"web_answers_result" in item_webanswers
and "text" in item_webanswers["web_answers_result"]["payload"]
):
answer = html_to_text(item_webanswers["web_answers_result"]["payload"]["text"])
if "web_answers_standard_result" in item_webanswers:
primary_link = item_webanswers["web_answers_standard_result"]["payload"]["standard_result"][
"primary_link"
]
url = primary_link["url"]
results.append({'answer': answer, 'url': url, 'result_index': -1})
return results
def twitter_result_group(self, item_to_parse):
results = []
if not include_twitter_results:
return results
result_index = item_to_parse["twitter_carousel_header"]["feature_metadata"][
"logging_tree_ref_feature_metadata_extension"
]["result_index"]
for item in item_to_parse["twitter_cards"]:
profile_payload = item["profile_link"]["payload"]["author"]
results.append(
{
"title": profile_payload["display_name"],
"url": profile_payload["profile_page_url"],
"result_index": result_index,
}
)
return results
def image_result_group(self, item_to_parse):
results = []
if not include_image_results:
return results
for item in item_to_parse["image_result_group_element"]:
results.append(parse_search_feature_proto(item["image_result"]))
return results
def news_wholepage(self, item_to_parse):
"""Parse a news search result"""
def iter_snippets():
"""Iterate over all the results, yield result_index, snippet to deal with nested structured"""
result_index = 0
for item in item_to_parse["element"]:
if "news_singleton_result_group" in item:
payload = item["news_singleton_result_group"]["result"]["payload"]["liquid_item_data"]
yield result_index, payload["article"]["stream_simplified_snippet"]
result_index += 1
continue
if "top_coverage" in item:
for element in item["top_coverage"]["element"]:
yield result_index, element["result"]["payload"]["liquid_item_data"]["article"][
"stream_simplified_snippet"
]
result_index += 1
continue
if "news_sports_hub_result_group" in item:
for element in item["news_sports_hub_result_group"]["element"]:
yield result_index, element["result"]["payload"]["liquid_item_data"]["article"][
"stream_simplified_snippet"
]
result_index += 1
continue
if "news_topic_hub_refinements_result_group" in item:
for ref_list in item["news_topic_hub_refinements_result_group"]["refinements"]["refinement_list"]:
for result in ref_list["results"]:
yield result_index, result["payload"]["liquid_item_data"]["article"][
"stream_simplified_snippet"
]
result_index += 1
continue
print("unknow news", item)
results = []
for result_index, snippet in iter_snippets():
publishedDate = snippet["date"]["timestamp"]
url = snippet["url"]["result_url"]
title = html_to_text(snippet["title"]["text"])
content = html_to_text(snippet["snippet"]["snippet"])
img_src = snippet.get("thumbnail_info", {}).get("sffe_50k_thumbnail_url")
results.append(
{
'url': url,
'title': title,
'content': content,
'img_src': img_src,
'publishedDate': datetime.fromtimestamp(publishedDate),
"result_index": result_index,
}
)
return results
class ParseResultItem: # pylint: disable=too-few-public-methods
"""Parse result_search_feature_proto.search_feature_proto"""
def __init__(self, locale):
self.locale = locale
self.item_types = {
"LOCAL_TIME": self.local_time,
"IMAGE_RESULT": self.image_result,
}
def local_time(self, item_to_parse):
"""Query like 'time in auckland' or 'time'
Note: localized_location reveal the location of the server
"""
seconds_utc = item_to_parse["payload"]["current_time"]["seconds_utc"]
timezones_0 = item_to_parse["payload"]["target_location"]["timezones"][0]
iana_timezone = timezones_0["iana_timezone"]
localized_location = timezones_0["localized_location"]
# parse timezone_abbrev_specific to create result_tz
# timezone_abbrev_specific for India is "UTC+5:30" and for New York is "UTC4"
# the values for offsets are respectively ["5", "30", "0"] and ["-4": "0"]
timezone_abbrev_specific = timezones_0["timezone_abbrev_specific"]
offsets = timezone_abbrev_specific.replace("UTC", "").replace("GMT", "").replace("", "-").split(":")
offsets.append("0")
result_tz = tzoffset(iana_timezone, timedelta(hours=int(offsets[0]), minutes=int(offsets[1])))
result_dt = datetime.fromtimestamp(seconds_utc, tz=result_tz)
result_dt_str = format_datetime(result_dt, 'long', tzinfo=result_tz, locale=self.locale)
answer = f"{result_dt_str} ( {localized_location} )"
return [{'answer': answer, 'result_index': -1}]
def image_result(self, item_to_parse):
return [parse_search_feature_proto(item_to_parse)]
def parse_web_results_list(json_data, locale):
results = []
tier_1_search_results = json_data["arcResponse"]["search_results"]["tier_1_search_results"]
results_list = tier_1_search_results["result_list"]["item"]
if "spell_suggestion" in tier_1_search_results:
spell_suggestion = tier_1_search_results["spell_suggestion"]
if "spell_column" in spell_suggestion:
for spell_suggestion in tier_1_search_results["spell_suggestion"]["spell_column"]:
for spell_link in spell_suggestion["spell_link"]:
results.append({'correction': spell_link["raw_corrected_query"], 'result_index': -1})
elif "full_page" in spell_suggestion:
results.append({'correction': spell_suggestion["full_page"]["raw_query"], 'result_index': -1})
parseResultItem = ParseResultItem(locale)
parseResultGroupItem = ParseResultGroupItem(locale)
for item in results_list:
if "result_group" in item:
result_item = item["result_group"]
result_item_extension = result_item["result_group_extension"]
elif "result" in item:
result_item = item["result"]
result_item_extension = result_item["result_extension"]
one_namespace_type = result_item_extension["one_namespace_type"]
if one_namespace_type in parseResultGroupItem.item_types and "result_group_search_feature_proto" in result_item:
search_feature_proto = result_item["result_group_search_feature_proto"]["search_feature_proto"]
results = results + parseResultGroupItem.item_types[one_namespace_type](search_feature_proto)
elif one_namespace_type in parseResultItem.item_types and "result_search_feature_proto" in result_item:
search_feature_proto = result_item["result_search_feature_proto"]["search_feature_proto"]
results = results + parseResultItem.item_types[one_namespace_type](search_feature_proto)
elif "result_group_search_feature_proto" in result_item:
print(dumps(one_namespace_type))
return sorted(results, key=lambda d: d['result_index'])
def response(resp):
"""Get response from google's search request"""
detect_google_sorry(resp)
language = resp.search_params["language"]
locale = 'en'
try:
locale = babel.Locale.parse(language, sep='-')
except babel.core.UnknownLocaleError:
pass
# only the 2nd line has the JSON content
response_2nd_line = resp.text.split("\n", 1)[1]
json_data = loads(response_2nd_line)
return parse_web_results_list(json_data, locale)

View file

@ -1,24 +1,40 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""This is the implementation of the google news engine. The google news API
ignores some parameters from the common :ref:`google API`:
"""This is the implementation of the Google News engine.
- num_ : the number of search results is ignored
Google News has a different region handling compared to Google WEB.
- the ``ceid`` argument has to be set (:py:obj:`ceid_list`)
- the hl_ argument has to be set correctly (and different to Google WEB)
- the gl_ argument is mandatory
If one of this argument is not set correctly, the request is redirected to
CONSENT dialog::
https://consent.google.com/m?continue=
The google news API ignores some parameters from the common :ref:`google API`:
- num_ : the number of search results is ignored / there is no paging all
results for a query term are in the first response.
- save_ : is ignored / Google-News results are always *SafeSearch*
.. _hl: https://developers.google.com/custom-search/docs/xml_results#hlsp
.. _gl: https://developers.google.com/custom-search/docs/xml_results#glsp
.. _num: https://developers.google.com/custom-search/docs/xml_results#numsp
.. _save: https://developers.google.com/custom-search/docs/xml_results#safesp
"""
# pylint: disable=invalid-name
from typing import TYPE_CHECKING
import binascii
import re
from urllib.parse import urlencode
from base64 import b64decode
from lxml import html
import babel
from searx import locales
from searx.utils import (
eval_xpath,
eval_xpath_list,
@ -26,18 +42,19 @@ from searx.utils import (
extract_text,
)
# pylint: disable=unused-import
from searx.engines.google import fetch_traits as _fetch_traits # pylint: disable=unused-import
from searx.engines.google import (
supported_languages_url,
_fetch_supported_languages,
)
# pylint: enable=unused-import
from searx.engines.google import (
get_lang_info,
get_google_info,
detect_google_sorry,
)
from searx.enginelib.traits import EngineTraits
if TYPE_CHECKING:
import logging
logger: logging.Logger
traits: EngineTraits
# about
about = {
@ -49,70 +66,77 @@ about = {
"results": 'HTML',
}
# compared to other google engines google-news has a different time range
# support. The time range is included in the search term.
time_range_dict = {
'day': 'when:1d',
'week': 'when:7d',
'month': 'when:1m',
'year': 'when:1y',
}
# engine dependent config
categories = ['news']
paging = False
use_locale_domain = True
time_range_support = True
time_range_support = False
# Google-News results are always *SafeSearch*. Option 'safesearch' is set to
# False here, otherwise checker will report safesearch-errors::
#
# safesearch : results are identitical for safesearch=0 and safesearch=2
safesearch = False
send_accept_language_header = True
safesearch = True
# send_accept_language_header = True
def request(query, params):
"""Google-News search request"""
lang_info = get_lang_info(params, supported_languages, language_aliases, False)
sxng_locale = params.get('searxng_locale', 'en-US')
ceid = locales.get_engine_locale(sxng_locale, traits.custom['ceid'], default='US:en')
google_info = get_google_info(params, traits)
google_info['subdomain'] = 'news.google.com' # google news has only one domain
# google news has only one domain
lang_info['subdomain'] = 'news.google.com'
ceid_region, ceid_lang = ceid.split(':')
ceid_lang, ceid_suffix = (
ceid_lang.split('-')
+ [
None,
]
)[:2]
ceid = "%s:%s" % (lang_info['country'], lang_info['language'])
google_info['params']['hl'] = ceid_lang
# google news redirects en to en-US
if lang_info['params']['hl'] == 'en':
lang_info['params']['hl'] = 'en-US'
if ceid_suffix and ceid_suffix not in ['Hans', 'Hant']:
# Very special to google-news compared to other google engines, the time
# range is included in the search term.
if params['time_range']:
query += ' ' + time_range_dict[params['time_range']]
if ceid_region.lower() == ceid_lang:
google_info['params']['hl'] = ceid_lang + '-' + ceid_region
else:
google_info['params']['hl'] = ceid_lang + '-' + ceid_suffix
elif ceid_region.lower() != ceid_lang:
if ceid_region in ['AT', 'BE', 'CH', 'IL', 'SA', 'IN', 'BD', 'PT']:
google_info['params']['hl'] = ceid_lang
else:
google_info['params']['hl'] = ceid_lang + '-' + ceid_region
google_info['params']['lr'] = 'lang_' + ceid_lang.split('-')[0]
google_info['params']['gl'] = ceid_region
query_url = (
'https://'
+ lang_info['subdomain']
+ '/search'
+ "?"
+ urlencode({'q': query, **lang_info['params'], 'ie': "utf8", 'oe': "utf8", 'gl': lang_info['country']})
+ google_info['subdomain']
+ "/search?"
+ urlencode(
{
'q': query,
**google_info['params'],
}
)
# ceid includes a ':' character which must not be urlencoded
+ ('&ceid=%s' % ceid)
) # ceid includes a ':' character which must not be urlencoded
)
params['url'] = query_url
params['cookies']['CONSENT'] = "YES+"
params['headers'].update(lang_info['headers'])
params['headers']['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
params['cookies'] = google_info['cookies']
params['headers'].update(google_info['headers'])
return params
def response(resp):
"""Get response from google's search request"""
results = []
detect_google_sorry(resp)
# convert the text to dom
@ -152,8 +176,8 @@ def response(resp):
# The pub_date is mostly a string like 'yesertday', not a real
# timezone date or time. Therefore we can't use publishedDate.
pub_date = extract_text(eval_xpath(result, './article/div[1]/div[1]/time'))
pub_origin = extract_text(eval_xpath(result, './article/div[1]/div[1]/a'))
pub_date = extract_text(eval_xpath(result, './article//time'))
pub_origin = extract_text(eval_xpath(result, './article//a[@data-n-tid]'))
content = ' / '.join([x for x in [pub_origin, pub_date] if x])
@ -174,3 +198,127 @@ def response(resp):
# return results
return results
ceid_list = [
'AE:ar',
'AR:es-419',
'AT:de',
'AU:en',
'BD:bn',
'BE:fr',
'BE:nl',
'BG:bg',
'BR:pt-419',
'BW:en',
'CA:en',
'CA:fr',
'CH:de',
'CH:fr',
'CL:es-419',
'CN:zh-Hans',
'CO:es-419',
'CU:es-419',
'CZ:cs',
'DE:de',
'EG:ar',
'ES:es',
'ET:en',
'FR:fr',
'GB:en',
'GH:en',
'GR:el',
'HK:zh-Hant',
'HU:hu',
'ID:en',
'ID:id',
'IE:en',
'IL:en',
'IL:he',
'IN:bn',
'IN:en',
'IN:hi',
'IN:ml',
'IN:mr',
'IN:ta',
'IN:te',
'IT:it',
'JP:ja',
'KE:en',
'KR:ko',
'LB:ar',
'LT:lt',
'LV:en',
'LV:lv',
'MA:fr',
'MX:es-419',
'MY:en',
'NA:en',
'NG:en',
'NL:nl',
'NO:no',
'NZ:en',
'PE:es-419',
'PH:en',
'PK:en',
'PL:pl',
'PT:pt-150',
'RO:ro',
'RS:sr',
'RU:ru',
'SA:ar',
'SE:sv',
'SG:en',
'SI:sl',
'SK:sk',
'SN:fr',
'TH:th',
'TR:tr',
'TW:zh-Hant',
'TZ:en',
'UA:ru',
'UA:uk',
'UG:en',
'US:en',
'US:es-419',
'VE:es-419',
'VN:vi',
'ZA:en',
'ZW:en',
]
"""List of region/language combinations supported by Google News. Values of the
``ceid`` argument of the Google News REST API."""
_skip_values = [
'ET:en', # english (ethiopia)
'ID:en', # english (indonesia)
'LV:en', # english (latvia)
]
_ceid_locale_map = {'NO:no': 'nb-NO'}
def fetch_traits(engine_traits: EngineTraits):
_fetch_traits(engine_traits, add_domains=False)
engine_traits.custom['ceid'] = {}
for ceid in ceid_list:
if ceid in _skip_values:
continue
region, lang = ceid.split(':')
x = lang.split('-')
if len(x) > 1:
if x[1] not in ['Hant', 'Hans']:
lang = x[0]
sxng_locale = _ceid_locale_map.get(ceid, lang + '-' + region)
try:
locale = babel.Locale.parse(sxng_locale, sep='-')
except babel.UnknownLocaleError:
print("ERROR: %s -> %s is unknown by babel" % (ceid, sxng_locale))
continue
engine_traits.custom['ceid'][locales.region_tag(locale)] = ceid

View file

@ -1,19 +1,18 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""Google (Scholar)
"""This is the implementation of the Google Scholar engine.
For detailed description of the *REST-full* API see: `Query Parameter
Definitions`_.
.. _Query Parameter Definitions:
https://developers.google.com/custom-search/docs/xml_results#WebSearch_Query_Parameter_Definitions
Compared to other Google services the Scholar engine has a simple GET REST-API
and there does not exists `async` API. Even though the API slightly vintage we
can make use of the :ref:`google API` to assemble the arguments of the GET
request.
"""
# pylint: disable=invalid-name
from typing import TYPE_CHECKING
from typing import Optional
from urllib.parse import urlencode
from datetime import datetime
from typing import Optional
from lxml import html
from searx.utils import (
@ -23,19 +22,21 @@ from searx.utils import (
extract_text,
)
from searx.exceptions import SearxEngineCaptchaException
from searx.engines.google import fetch_traits # pylint: disable=unused-import
from searx.engines.google import (
get_lang_info,
get_google_info,
time_range_dict,
detect_google_sorry,
)
from searx.enginelib.traits import EngineTraits
# pylint: disable=unused-import
from searx.engines.google import (
supported_languages_url,
_fetch_supported_languages,
)
if TYPE_CHECKING:
import logging
# pylint: enable=unused-import
logger: logging.Logger
traits: EngineTraits
# about
about = {
@ -51,53 +52,62 @@ about = {
categories = ['science', 'scientific publications']
paging = True
language_support = True
use_locale_domain = True
time_range_support = True
safesearch = False
send_accept_language_header = True
def time_range_url(params):
"""Returns a URL query component for a google-Scholar time range based on
``params['time_range']``. Google-Scholar does only support ranges in years.
To have any effect, all the Searx ranges (*day*, *week*, *month*, *year*)
are mapped to *year*. If no range is set, an empty string is returned.
Example::
def time_range_args(params):
"""Returns a dictionary with a time range arguments based on
``params['time_range']``.
Google Scholar supports a detailed search by year. Searching by *last
month* or *last week* (as offered by SearXNG) is uncommon for scientific
publications and is not supported by Google Scholar.
To limit the result list when the users selects a range, all the SearXNG
ranges (*day*, *week*, *month*, *year*) are mapped to *year*. If no range
is set an empty dictionary of arguments is returned. Example; when
user selects a time range (current year minus one in 2022):
.. code:: python
{ 'as_ylo' : 2021 }
&as_ylo=2019
"""
# as_ylo=2016&as_yhi=2019
ret_val = ''
ret_val = {}
if params['time_range'] in time_range_dict:
ret_val = urlencode({'as_ylo': datetime.now().year - 1})
return '&' + ret_val
ret_val['as_ylo'] = datetime.now().year - 1
return ret_val
def detect_google_captcha(dom):
"""In case of CAPTCHA Google Scholar open its own *not a Robot* dialog and is
not redirected to ``sorry.google.com``.
"""
if eval_xpath(dom, "//form[@id='gs_captcha_f']"):
raise SearxEngineCaptchaException()
def request(query, params):
"""Google-Scholar search request"""
offset = (params['pageno'] - 1) * 10
lang_info = get_lang_info(params, supported_languages, language_aliases, False)
google_info = get_google_info(params, traits)
# subdomain is: scholar.google.xy
lang_info['subdomain'] = lang_info['subdomain'].replace("www.", "scholar.")
google_info['subdomain'] = google_info['subdomain'].replace("www.", "scholar.")
query_url = (
'https://'
+ lang_info['subdomain']
+ '/scholar'
+ "?"
+ urlencode({'q': query, **lang_info['params'], 'ie': "utf8", 'oe': "utf8", 'start': offset})
)
args = {
'q': query,
**google_info['params'],
'start': (params['pageno'] - 1) * 10,
'as_sdt': '2007', # include patents / to disable set '0,5'
'as_vis': '0', # include citations / to disable set '1'
}
args.update(time_range_args(params))
query_url += time_range_url(params)
params['url'] = query_url
params['cookies']['CONSENT'] = "YES+"
params['headers'].update(lang_info['headers'])
params['headers']['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
# params['google_subdomain'] = subdomain
params['url'] = 'https://' + google_info['subdomain'] + '/scholar?' + urlencode(args)
params['cookies'] = google_info['cookies']
params['headers'].update(google_info['headers'])
return params
@ -138,19 +148,15 @@ def parse_gs_a(text: Optional[str]):
def response(resp): # pylint: disable=too-many-locals
"""Get response from google's search request"""
"""Parse response from Google Scholar"""
results = []
detect_google_sorry(resp)
# which subdomain ?
# subdomain = resp.search_params.get('google_subdomain')
# convert the text to dom
dom = html.fromstring(resp.text)
detect_google_captcha(dom)
# parse results
for result in eval_xpath_list(dom, '//div[@data-cid]'):
for result in eval_xpath_list(dom, '//div[@data-rp]'):
title = extract_text(eval_xpath(result, './/h3[1]//a'))
@ -158,7 +164,7 @@ def response(resp): # pylint: disable=too-many-locals
# this is a [ZITATION] block
continue
pub_type = extract_text(eval_xpath(result, './/span[@class="gs_ct1"]'))
pub_type = extract_text(eval_xpath(result, './/span[@class="gs_ctg2"]'))
if pub_type:
pub_type = pub_type[1:-1].lower()

View file

@ -1,6 +1,6 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""This is the implementation of the google videos engine.
"""This is the implementation of the Google Videos engine.
.. admonition:: Content-Security-Policy (CSP)
@ -14,9 +14,8 @@
"""
# pylint: disable=invalid-name
from typing import TYPE_CHECKING
import re
from urllib.parse import urlencode
from lxml import html
@ -27,20 +26,22 @@ from searx.utils import (
extract_text,
)
from searx.engines.google import fetch_traits # pylint: disable=unused-import
from searx.engines.google import (
get_lang_info,
get_google_info,
time_range_dict,
filter_mapping,
g_section_with_header,
title_xpath,
suggestion_xpath,
detect_google_sorry,
)
from searx.enginelib.traits import EngineTraits
# pylint: disable=unused-import
from searx.engines.google import supported_languages_url, _fetch_supported_languages
if TYPE_CHECKING:
import logging
# pylint: enable=unused-import
logger: logging.Logger
traits: EngineTraits
# about
about = {
@ -55,70 +56,32 @@ about = {
# engine dependent config
categories = ['videos', 'web']
paging = False
paging = True
language_support = True
use_locale_domain = True
time_range_support = True
safesearch = True
send_accept_language_header = True
RE_CACHE = {}
def _re(regexpr):
"""returns compiled regular expression"""
RE_CACHE[regexpr] = RE_CACHE.get(regexpr, re.compile(regexpr))
return RE_CACHE[regexpr]
def scrap_out_thumbs_src(dom):
ret_val = {}
thumb_name = 'dimg_'
for script in eval_xpath_list(dom, '//script[contains(., "google.ldi={")]'):
_script = script.text
# "dimg_35":"https://i.ytimg.c....",
_dimurl = _re("s='([^']*)").findall(_script)
for k, v in _re('(' + thumb_name + '[0-9]*)":"(http[^"]*)').findall(_script):
v = v.replace(r'\u003d', '=')
v = v.replace(r'\u0026', '&')
ret_val[k] = v
logger.debug("found %s imgdata for: %s", thumb_name, ret_val.keys())
return ret_val
def scrap_out_thumbs(dom):
"""Scrap out thumbnail data from <script> tags."""
ret_val = {}
thumb_name = 'dimg_'
for script in eval_xpath_list(dom, '//script[contains(., "_setImagesSrc")]'):
_script = script.text
# var s='data:image/jpeg;base64, ...'
_imgdata = _re("s='([^']*)").findall(_script)
if not _imgdata:
continue
# var ii=['dimg_17']
for _vidthumb in _re(r"(%s\d+)" % thumb_name).findall(_script):
# At least the equal sign in the URL needs to be decoded
ret_val[_vidthumb] = _imgdata[0].replace(r"\x3d", "=")
logger.debug("found %s imgdata for: %s", thumb_name, ret_val.keys())
return ret_val
def request(query, params):
"""Google-Video search request"""
lang_info = get_lang_info(params, supported_languages, language_aliases, False)
google_info = get_google_info(params, traits)
query_url = (
'https://'
+ lang_info['subdomain']
+ google_info['subdomain']
+ '/search'
+ "?"
+ urlencode({'q': query, 'tbm': "vid", **lang_info['params'], 'ie': "utf8", 'oe': "utf8"})
+ urlencode(
{
'q': query,
'tbm': "vid",
'start': 10 * params['pageno'],
**google_info['params'],
'asearch': 'arc',
'async': 'use_ac:true,_fmt:html',
}
)
)
if params['time_range'] in time_range_dict:
@ -127,9 +90,8 @@ def request(query, params):
query_url += '&' + urlencode({'safe': filter_mapping[params['safesearch']]})
params['url'] = query_url
params['cookies']['CONSENT'] = "YES+"
params['headers'].update(lang_info['headers'])
params['headers']['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
params['cookies'] = google_info['cookies']
params['headers'].update(google_info['headers'])
return params
@ -141,43 +103,30 @@ def response(resp):
# convert the text to dom
dom = html.fromstring(resp.text)
vidthumb_imgdata = scrap_out_thumbs(dom)
thumbs_src = scrap_out_thumbs_src(dom)
logger.debug(str(thumbs_src))
# parse results
for result in eval_xpath_list(dom, '//div[contains(@class, "g ")]'):
# ignore google *sections*
if extract_text(eval_xpath(result, g_section_with_header)):
logger.debug("ignoring <g-section-with-header>")
img_src = eval_xpath_getindex(result, './/img/@src', 0, None)
if img_src is None:
continue
# ingnore articles without an image id / e.g. news articles
img_id = eval_xpath_getindex(result, './/g-img/img/@id', 0, default=None)
if img_id is None:
logger.error("no img_id found in item %s (news article?)", len(results) + 1)
continue
title = extract_text(eval_xpath_getindex(result, './/a/h3[1]', 0))
url = eval_xpath_getindex(result, './/a/h3[1]/../@href', 0)
img_src = vidthumb_imgdata.get(img_id, None)
if not img_src:
img_src = thumbs_src.get(img_id, "")
title = extract_text(eval_xpath_getindex(result, title_xpath, 0))
url = eval_xpath_getindex(result, './/div[@class="dXiKIc"]//a/@href', 0)
length = extract_text(eval_xpath(result, './/div[contains(@class, "P7xzyf")]/span/span'))
c_node = eval_xpath_getindex(result, './/div[@class="Uroaid"]', 0)
content = extract_text(c_node)
pub_info = extract_text(eval_xpath(result, './/div[@class="Zg1NU"]'))
pub_info = extract_text(eval_xpath(result, './/div[@class="P7xzyf"]'))
length = extract_text(eval_xpath(result, './/div[@class="J1mWY"]'))
results.append(
{
'url': url,
'title': title,
'content': content,
'length': length,
'author': pub_info,
'thumbnail': img_src,
'length': length,
'template': 'videos.html',
}
)