# SPDX-License-Identifier: AGPL-3.0-or-later # lint: pylint """This is the implementation of the google WEB engine using the google internal API used on the mobile UI. This internal API offer results in - JSON (_fmt:json) - Protobuf (_fmt:pb) - Protobuf compressed? (_fmt:pc) - HTML (_fmt:html) - Protobuf encoded in JSON (_fmt:jspb). Some of this implementations are shared by other engines: The implementation is shared by other engines: - :ref:`google images internal engine` - :ref:`google news internal engine` - :ref:`google videos internal engine` """ from urllib.parse import urlencode from json import loads, dumps from datetime import datetime from zoneinfo import ZoneInfo from babel.dates import format_datetime import babel from searx.utils import html_to_text # pylint: disable=unused-import from searx.engines.google import ( get_lang_info, detect_google_sorry, supported_languages_url, time_range_dict, filter_mapping, _fetch_supported_languages, ) # pylint: enable=unused-import # about about = { "website": 'https://www.google.com', "wikidata_id": 'Q9366', "official_api_documentation": 'https://developers.google.com/custom-search/', "use_official_api": False, "require_api_key": False, "results": 'JSON', } # engine dependent config categories = None paging = True time_range_support = True safesearch = True send_accept_language_header = True # configuration include_image_results = True include_twitter_results = False def get_query_url_general(query, lang_info, query_params): return ( 'https://' + lang_info['subdomain'] + '/search' + "?" + urlencode( { 'q': query, **query_params, } ) ) def get_query_url_images(query, lang_info, query_params): # https://www.google.de/search?q=corona&hl=de&lr=lang_de&start=0&tbs=qdr%3Ad&safe=medium return ( 'https://' + lang_info['subdomain'] + '/search' + "?" + urlencode( { 'q': query, 'tbm': "isch", **query_params, } ) ) CATEGORY_TO_GET_QUERY_URL = { 'general': get_query_url_general, 'images': get_query_url_images, } def request(query, params): """Google search request""" offset = (params['pageno'] - 1) * 10 lang_info = get_lang_info(params, supported_languages, language_aliases, True) query_params = { **lang_info['params'], 'ie': "utf8", 'oe': "utf8", 'num': 30, 'start': offset, 'filter': '0', 'asearch': 'arc', 'async': 'use_ac:true,_fmt:json', } get_query_url = CATEGORY_TO_GET_QUERY_URL[categories[0]] # pylint: disable=unsubscriptable-object # https://www.google.de/search?q=corona&hl=de&lr=lang_de&start=0&tbs=qdr%3Ad&safe=medium query_url = get_query_url(query, lang_info, query_params) if params['time_range'] in time_range_dict: query_url += '&' + urlencode({'tbs': 'qdr:' + time_range_dict[params['time_range']]}) if params['safesearch']: query_url += '&' + urlencode({'safe': filter_mapping[params['safesearch']]}) params['url'] = query_url params['headers'].update(lang_info['headers']) params['headers']['Accept'] = '*/*' return params def parse_search_feature_proto(search_feature_proto): result_index = search_feature_proto["feature_metadata"]["logging_tree_ref_feature_metadata_extension"][ "result_index" ] image_result_data = search_feature_proto["payload"]["image_result_data"] title = html_to_text(image_result_data["page_title"]) content = html_to_text(image_result_data.get("snippet", "")) url = image_result_data["coupled_url"] img_src = image_result_data["url"] thumbnail_src = "https://encrypted-tbn0.gstatic.com/images?q=tbn:" + image_result_data["encrypted_docid"] img_format = f'{image_result_data["full_image_size"]["width"]} * {image_result_data["full_image_size"]["height"]}' iptc = image_result_data.get("iptc_info", {}).get("iptc", {}) copyright_notice = iptc.get("copyright_notice") creator = iptc.get("creator") if isinstance(creator, list): creator = ", ".join(creator) if creator and copyright_notice and creator != copyright_notice: author = f'{creator} ; {copyright_notice}' else: author = creator return { "template": "images.html", "title": title, "content": content, "url": url, "img_src": img_src, "thumbnail_src": thumbnail_src, 'img_format': img_format, "author": author, "result_index": result_index, } class ParseResultGroupItem: """Parse result_group_search_feature_proto.search_feature_proto""" def __init__(self, locale): """Parse one tier 1 result""" self.locale = locale self.item_types = { "EXPLORE_UNIVERSAL_BLOCK": self.explore_universal_block, "HOST_CLUSTER": self.host_cluster, "NAVIGATIONAL_RESULT_GROUP": self.navigational_result_group, "VIDEO_RESULT": self.video_result, "VIDEO_UNIVERSAL_GROUP": self.video_universal_group, "WEB_RESULT": self.web_result, "WEB_ANSWERS_CARD_BLOCK": self.web_answers_card_block, "IMAGE_RESULT_GROUP": self.image_result_group, "TWITTER_RESULT_GROUP": self.twitter_result_group, # WHOLEPAGE_PAGE_GROUP - found for keyword what is t in English language # EXPLORE_UNIVERSAL_BLOCK # TRAVEL_ANSWERS_RESULT # TOP_STORIES : news.html template # ONEBOX_BLOCK: for example, result of math forumla, weather ... } def explore_universal_block(self, item_to_parse): results = [] for item in item_to_parse["explore_universal_unit_sfp_interface"]: explore_unit = item["explore_block_extension"]["payload"]["explore_unit"] if "lookup_key" in explore_unit: results.append( {'suggestion': html_to_text(explore_unit["lookup_key"]["aquarium_query"]), 'result_index': -1} ) elif "label" in explore_unit: results.append({'suggestion': html_to_text(explore_unit["label"]["text"]), 'result_index': -1}) return results def host_cluster(self, item_to_parse): results = [] for navigational_result in item_to_parse["results"]: result_index = navigational_result["web_result_inner"]["feature_metadata"][ "logging_tree_ref_feature_metadata_extension" ]["result_index"] url = None title = None content = None for item in navigational_result["payload"]["sub_features"]["sub_feature"]: payload = item["search_feature_proto"]["payload"] if "primary_link" in payload: primary_link = payload["primary_link"] title = html_to_text(primary_link["title"]) url = primary_link["url"] if "snippet_text" in payload: content = html_to_text(payload["snippet_text"]) results.append({'url': url, 'title': title, 'content': content, 'result_index': result_index}) # to do: parse additional results return results def navigational_result_group(self, item_to_parse): results = [] navigational_result = item_to_parse["navigational_result"] result_index = navigational_result["navigational_result_inner"]["feature_metadata"][ "logging_tree_ref_feature_metadata_extension" ]["result_index"] url = None title = None content = None for item in navigational_result["payload"]["sub_features"]["sub_feature"]: payload = item["search_feature_proto"]["payload"] if "primary_link" in payload: primary_link = payload["primary_link"] title = html_to_text(primary_link["title"]) url = primary_link["url"] if "snippet_text" in payload: content = html_to_text(payload["snippet_text"]) results.append({'url': url, 'title': title, 'content': content, 'result_index': result_index}) for item in item_to_parse["megasitelinks"]["results"]: result_data = item["payload"]["result_data"] url = result_data["url"] title = html_to_text(result_data["result_title"]) content = html_to_text(result_data["snippet"]) result_index = item["feature_metadata"]["logging_tree_ref_feature_metadata_extension"]["result_index"] results.append({'url': url, 'title': title, 'content': content, 'result_index': result_index}) return results def video_result(self, item_to_parse): result_index = item_to_parse["feature_metadata"]["logging_tree_ref_feature_metadata_extension"]["result_index"] url = None title = None for item in item_to_parse["payload"]["sub_features"]["sub_feature"]: payload = item["search_feature_proto"]["payload"] if "primary_link" in payload: primary_link = payload["primary_link"] title = html_to_text(primary_link["title"]) url = primary_link["url"] return [{'url': url, 'title': title, 'result_index': result_index}] def video_universal_group(self, item_to_parse): results = [] for item in item_to_parse["video_universal_group_element"]: video_result = item["video_result"] result_index = video_result["feature_metadata"]["logging_tree_ref_feature_metadata_extension"][ "result_index" ] video_result_data = video_result["payload"]["video_result_data"] url = video_result_data["url"] title = html_to_text(video_result_data["title"]) content = html_to_text(video_result_data["snippet"]) results.append({'url': url, 'title': title, 'content': content, 'result_index': result_index}) return results def web_result(self, item_to_parse): result_index = item_to_parse["web_result_inner"]["feature_metadata"][ "logging_tree_ref_feature_metadata_extension" ]["result_index"] url = None title = None content = None for item in item_to_parse["payload"]["sub_features"]["sub_feature"]: payload = item["search_feature_proto"]["payload"] if "primary_link" in payload: primary_link = payload["primary_link"] title = html_to_text(primary_link["title"]) url = primary_link["url"] if "snippet_text" in payload: content = html_to_text(payload["snippet_text"]) return [{'url': url, 'title': title, 'content': content, 'result_index': result_index}] def web_answers_card_block(self, item_to_parse): results = [] for item in item_to_parse["web_answers_card_block_elements"]: answer = None url = None for item_webanswers in item["webanswers_container"]["webanswers_container_elements"]: if ( "web_answers_result" in item_webanswers and "text" in item_webanswers["web_answers_result"]["payload"] ): answer = html_to_text(item_webanswers["web_answers_result"]["payload"]["text"]) if "web_answers_standard_result" in item_webanswers: primary_link = item_webanswers["web_answers_standard_result"]["payload"]["standard_result"][ "primary_link" ] url = primary_link["url"] results.append({'answer': answer, 'url': url, 'result_index': -1}) return results def twitter_result_group(self, item_to_parse): results = [] if not include_twitter_results: return results result_index = item_to_parse["twitter_carousel_header"]["feature_metadata"][ "logging_tree_ref_feature_metadata_extension" ]["result_index"] for item in item_to_parse["twitter_cards"]: profile_payload = item["profile_link"]["payload"]["author"] results.append( { "title": profile_payload["display_name"], "url": profile_payload["profile_page_url"], "result_index": result_index, } ) return results def image_result_group(self, item_to_parse): results = [] if not include_image_results: return results for item in item_to_parse["image_result_group_element"]: print(item) results.append(parse_search_feature_proto(item["image_result"])) return results class ParseResultItem: # pylint: disable=too-few-public-methods """Parse result_search_feature_proto.search_feature_proto""" def __init__(self, locale): self.locale = locale self.item_types = { "LOCAL_TIME": self.local_time, "IMAGE_RESULT": self.image_result, } def local_time(self, item_to_parse): """Query like 'time in auckland' or 'time' Note: localized_location reveal the location of the server """ seconds_utc = item_to_parse["payload"]["current_time"]["seconds_utc"] timezones_0 = item_to_parse["payload"]["target_location"]["timezones"][0] iana_timezone = timezones_0["iana_timezone"] localized_location = timezones_0["localized_location"] result_tz = ZoneInfo(iana_timezone) result_dt = datetime.fromtimestamp(seconds_utc, tz=result_tz) result_dt_str = format_datetime(result_dt, 'long', tzinfo=result_tz, locale=self.locale) answer = f"{result_dt_str} ( {localized_location} )" return [{'answer': answer, 'result_index': -1}] def image_result(self, item_to_parse): return [parse_search_feature_proto(item_to_parse)] def parse_web_results_list(json_data, locale): results = [] tier_1_search_results = json_data["arcResponse"]["search_results"]["tier_1_search_results"] results_list = tier_1_search_results["result_list"]["item"] if "spell_suggestion" in tier_1_search_results: print(tier_1_search_results["spell_suggestion"]) spell_suggestion = tier_1_search_results["spell_suggestion"] if "spell_column" in spell_suggestion: for spell_suggestion in tier_1_search_results["spell_suggestion"]["spell_column"]: for spell_link in spell_suggestion["spell_link"]: results.append({'correction': spell_link["raw_corrected_query"], 'result_index': -1}) elif "full_page" in spell_suggestion: results.append({'correction': spell_suggestion["full_page"]["raw_query"], 'result_index': -1}) parseResultItem = ParseResultItem(locale) parseResultGroupItem = ParseResultGroupItem(locale) for item in results_list: if "result_group" in item: result_item = item["result_group"] result_item_extension = result_item["result_group_extension"] elif "result" in item: result_item = item["result"] result_item_extension = result_item["result_extension"] one_namespace_type = result_item_extension["one_namespace_type"] if one_namespace_type in parseResultGroupItem.item_types and "result_group_search_feature_proto" in result_item: search_feature_proto = result_item["result_group_search_feature_proto"]["search_feature_proto"] results = results + parseResultGroupItem.item_types[one_namespace_type](search_feature_proto) elif one_namespace_type in parseResultItem.item_types and "result_search_feature_proto" in result_item: search_feature_proto = result_item["result_search_feature_proto"]["search_feature_proto"] results = results + parseResultItem.item_types[one_namespace_type](search_feature_proto) elif "result_group_search_feature_proto" in result_item: print(dumps(one_namespace_type)) return sorted(results, key=lambda d: d['result_index']) def response(resp): """Get response from google's search request""" detect_google_sorry(resp) language = resp.search_params["language"] locale = 'en' try: locale = babel.Locale.parse(language, sep='-') except babel.core.UnknownLocaleError: pass # only the 2nd line has the JSON content response_2nd_line = resp.text.split("\n", 1)[1] json_data = loads(response_2nd_line) return parse_web_results_list(json_data, locale)