diff --git a/searx/webapp.py b/searx/webapp.py index cae30d4bb..f018dd5bb 100755 --- a/searx/webapp.py +++ b/searx/webapp.py @@ -3219,6 +3219,3305 @@ if not werkzeug_reloader or (werkzeug_reloader and os.environ.get("WERKZEUG_RUN_ search_initialize(enable_checker=True, check_network=True, enable_metrics=settings['general']['enable_metrics']) +class DFA: + def __init__(self, path: str = None): + self.ban_words_set = set() + self.ban_words_list = list() + self.ban_words_dict = dict() + if not path: + self.path = 'keywords' + else: + self.path = path + self.get_words() + + # 获取敏感词列表 + def get_words(self): + with open(self.path, 'r', encoding='utf-8-sig') as f: + for s in f: + if s.find('\\r'): + s = s.replace('\r', '') + s = s.replace('\n', '') + s = s.strip() + if len(s) == 0: + continue + if str(s) and s not in self.ban_words_set: + self.ban_words_set.add(s) + self.ban_words_list.append(str(s)) + sentence = pycorrector.simplified2traditional(s) + if sentence != s: + self.ban_words_set.add(sentence) + self.ban_words_list.append(str(sentence)) + self.add_hash_dict(self.ban_words_list) + + def change_words(self, path): + self.ban_words_list.clear() + self.ban_words_dict.clear() + self.ban_words_set.clear() + self.path = path + self.get_words() + + # 将敏感词列表转换为DFA字典序 + def add_hash_dict(self, new_list): + for x in new_list: + self.add_new_word(x) + + # 添加单个敏感词 + def add_new_word(self, new_word): + new_word = str(new_word) + # print(new_word) + now_dict = self.ban_words_dict + i = 0 + for x in new_word: + if x not in now_dict: + x = str(x) + new_dict = dict() + new_dict['is_end'] = False + now_dict[x] = new_dict + now_dict = new_dict + else: + now_dict = now_dict[x] + if i == len(new_word) - 1: + now_dict['is_end'] = True + i += 1 + + # 寻找第一次出现敏感词的位置 + def find_illegal(self, _str): + now_dict = self.ban_words_dict + i = 0 + start_word = -1 + is_start = True # 判断是否是一个敏感词的开始 + while i < len(_str): + if _str[i] not in now_dict: + if is_start is True: + i += 1 + continue + i = start_word + 1 + start_word = -1 + is_start = True + now_dict = self.ban_words_dict + else: + if is_star#!/usr/bin/env python +# SPDX-License-Identifier: AGPL-3.0-or-later +# lint: pylint +# pyright: basic +"""WebbApp + +""" +# pylint: disable=use-dict-literal + +import hashlib +import hmac +import json +import os +import sys +import base64 +import requests +import markdown +import re +import datetime +from textrank4zh import TextRank4Keyword, TextRank4Sentence +import pycorrector + +from timeit import default_timer +from html import escape +from io import StringIO +import typing +from typing import List, Dict, Iterable + +import urllib +import urllib.parse +from urllib.parse import urlencode, unquote + +import httpx + +from pygments import highlight +from pygments.lexers import get_lexer_by_name +from pygments.formatters import HtmlFormatter # pylint: disable=no-name-in-module + +import flask + +from flask import ( + Flask, + render_template, + url_for, + make_response, + redirect, + send_from_directory, +) +from flask.wrappers import Response +from flask.json import jsonify + +from flask_babel import ( + Babel, + gettext, + format_decimal, +) + +from searx import ( + logger, + get_setting, + settings, + searx_debug, +) + +from searx import infopage +from searx.data import ENGINE_DESCRIPTIONS +from searx.results import Timing, UnresponsiveEngine +from searx.settings_defaults import OUTPUT_FORMATS +from searx.settings_loader import get_default_settings_path +from searx.exceptions import SearxParameterException +from searx.engines import ( + OTHER_CATEGORY, + categories, + engines, + engine_shortcuts, +) +from searx.webutils import ( + UnicodeWriter, + highlight_content, + get_static_files, + get_result_templates, + get_themes, + prettify_url, + new_hmac, + is_hmac_of, + is_flask_run_cmdline, + group_engines_in_tab, + searxng_l10n_timespan, +) +from searx.webadapter import ( + get_search_query_from_webapp, + get_selected_categories, +) +from searx.utils import ( + html_to_text, + gen_useragent, + dict_subset, + match_language, +) +from searx.version import VERSION_STRING, GIT_URL, GIT_BRANCH +from searx.query import RawTextQuery +from searx.plugins import Plugin, plugins, initialize as plugin_initialize +from searx.plugins.oa_doi_rewrite import get_doi_resolver +from searx.preferences import ( + Preferences, + ValidationException, +) +from searx.answerers import ( + answerers, + ask, +) +from searx.metrics import ( + get_engines_stats, + get_engine_errors, + get_reliabilities, + histogram, + counter, +) +from searx.flaskfix import patch_application + +from searx.locales import ( + LOCALE_NAMES, + RTL_LOCALES, + localeselector, + locales_initialize, +) + +# renaming names from searx imports ... +from searx.autocomplete import search_autocomplete, backends as autocomplete_backends +from searx.languages import language_codes as languages +from searx.redisdb import initialize as redis_initialize +from searx.search import SearchWithPlugins, initialize as search_initialize +from searx.network import stream as http_stream, set_context_network_name +from searx.search.checker import get_result as checker_get_result + +logger = logger.getChild('webapp') + +# check secret_key +if not searx_debug and settings['server']['secret_key'] == 'ultrasecretkey': + logger.error('server.secret_key is not changed. Please use something else instead of ultrasecretkey.') + sys.exit(1) + +# about static +logger.debug('static directory is %s', settings['ui']['static_path']) +static_files = get_static_files(settings['ui']['static_path']) + +# about templates +logger.debug('templates directory is %s', settings['ui']['templates_path']) +default_theme = settings['ui']['default_theme'] +templates_path = settings['ui']['templates_path'] +themes = get_themes(templates_path) +result_templates = get_result_templates(templates_path) + +STATS_SORT_PARAMETERS = { + 'name': (False, 'name', ''), + 'score': (True, 'score_per_result', 0), + 'result_count': (True, 'result_count', 0), + 'time': (False, 'total', 0), + 'reliability': (False, 'reliability', 100), +} + +# Flask app +app = Flask(__name__, static_folder=settings['ui']['static_path'], template_folder=templates_path) + +app.jinja_env.trim_blocks = True +app.jinja_env.lstrip_blocks = True +app.jinja_env.add_extension('jinja2.ext.loopcontrols') # pylint: disable=no-member +app.jinja_env.filters['group_engines_in_tab'] = group_engines_in_tab # pylint: disable=no-member +app.secret_key = settings['server']['secret_key'] + +timeout_text = gettext('timeout') +parsing_error_text = gettext('parsing error') +http_protocol_error_text = gettext('HTTP protocol error') +network_error_text = gettext('network error') +ssl_cert_error_text = gettext("SSL error: certificate validation has failed") +exception_classname_to_text = { + None: gettext('unexpected crash'), + 'timeout': timeout_text, + 'asyncio.TimeoutError': timeout_text, + 'httpx.TimeoutException': timeout_text, + 'httpx.ConnectTimeout': timeout_text, + 'httpx.ReadTimeout': timeout_text, + 'httpx.WriteTimeout': timeout_text, + 'httpx.HTTPStatusError': gettext('HTTP error'), + 'httpx.ConnectError': gettext("HTTP connection error"), + 'httpx.RemoteProtocolError': http_protocol_error_text, + 'httpx.LocalProtocolError': http_protocol_error_text, + 'httpx.ProtocolError': http_protocol_error_text, + 'httpx.ReadError': network_error_text, + 'httpx.WriteError': network_error_text, + 'httpx.ProxyError': gettext("proxy error"), + 'searx.exceptions.SearxEngineCaptchaException': gettext("CAPTCHA"), + 'searx.exceptions.SearxEngineTooManyRequestsException': gettext("too many requests"), + 'searx.exceptions.SearxEngineAccessDeniedException': gettext("access denied"), + 'searx.exceptions.SearxEngineAPIException': gettext("server API error"), + 'searx.exceptions.SearxEngineXPathException': parsing_error_text, + 'KeyError': parsing_error_text, + 'json.decoder.JSONDecodeError': parsing_error_text, + 'lxml.etree.ParserError': parsing_error_text, + 'ssl.SSLCertVerificationError': ssl_cert_error_text, # for Python > 3.7 + 'ssl.CertificateError': ssl_cert_error_text, # for Python 3.7 +} + + +class ExtendedRequest(flask.Request): + """This class is never initialized and only used for type checking.""" + + preferences: Preferences + errors: List[str] + user_plugins: List[Plugin] + form: Dict[str, str] + start_time: float + render_time: float + timings: List[Timing] + + +request = typing.cast(ExtendedRequest, flask.request) + + +def get_locale(): + locale = localeselector() + logger.debug("%s uses locale `%s`", urllib.parse.quote(request.url), locale) + return locale + + +babel = Babel(app, locale_selector=get_locale) + + +def _get_browser_language(req, lang_list): + for lang in req.headers.get("Accept-Language", "en").split(","): + if ';' in lang: + lang = lang.split(';')[0] + if '-' in lang: + lang_parts = lang.split('-') + lang = "{}-{}".format(lang_parts[0], lang_parts[-1].upper()) + locale = match_language(lang, lang_list, fallback=None) + if locale is not None: + return locale + return 'en' + + +def _get_locale_rfc5646(locale): + """Get locale name for + Chrom* browsers don't detect the language when there is a subtag (ie a territory). + For example "zh-TW" is detected but not "zh-Hant-TW". + This function returns a locale without the subtag. + """ + parts = locale.split('-') + return parts[0].lower() + '-' + parts[-1].upper() + + +# code-highlighter +@app.template_filter('code_highlighter') +def code_highlighter(codelines, language=None): + if not language: + language = 'text' + + try: + # find lexer by programming language + lexer = get_lexer_by_name(language, stripall=True) + + except Exception as e: # pylint: disable=broad-except + logger.exception(e, exc_info=True) + # if lexer is not found, using default one + lexer = get_lexer_by_name('text', stripall=True) + + html_code = '' + tmp_code = '' + last_line = None + line_code_start = None + + # parse lines + for line, code in codelines: + if not last_line: + line_code_start = line + + # new codeblock is detected + if last_line is not None and last_line + 1 != line: + + # highlight last codepart + formatter = HtmlFormatter(linenos='inline', linenostart=line_code_start, cssclass="code-highlight") + html_code = html_code + highlight(tmp_code, lexer, formatter) + + # reset conditions for next codepart + tmp_code = '' + line_code_start = line + + # add codepart + tmp_code += code + '\n' + + # update line + last_line = line + + # highlight last codepart + formatter = HtmlFormatter(linenos='inline', linenostart=line_code_start, cssclass="code-highlight") + html_code = html_code + highlight(tmp_code, lexer, formatter) + + return html_code + + +def get_result_template(theme_name: str, template_name: str): + themed_path = theme_name + '/result_templates/' + template_name + if themed_path in result_templates: + return themed_path + return 'result_templates/' + template_name + + +def custom_url_for(endpoint: str, **values): + suffix = "" + if endpoint == 'static' and values.get('filename'): + file_hash = static_files.get(values['filename']) + if not file_hash: + # try file in the current theme + theme_name = request.preferences.get_value('theme') + filename_with_theme = "themes/{}/{}".format(theme_name, values['filename']) + file_hash = static_files.get(filename_with_theme) + if file_hash: + values['filename'] = filename_with_theme + if get_setting('ui.static_use_hash') and file_hash: + suffix = "?" + file_hash + if endpoint == 'info' and 'locale' not in values: + locale = request.preferences.get_value('locale') + if _INFO_PAGES.get_page(values['pagename'], locale) is None: + locale = _INFO_PAGES.locale_default + values['locale'] = locale + return url_for(endpoint, **values) + suffix + + +def morty_proxify(url: str): + if url.startswith('//'): + url = 'https:' + url + + if not settings['result_proxy']['url']: + return url + + url_params = dict(mortyurl=url) + + if settings['result_proxy']['key']: + url_params['mortyhash'] = hmac.new(settings['result_proxy']['key'], url.encode(), hashlib.sha256).hexdigest() + + return '{0}?{1}'.format(settings['result_proxy']['url'], urlencode(url_params)) + + +def image_proxify(url: str): + + if url.startswith('//'): + url = 'https:' + url + + if not request.preferences.get_value('image_proxy'): + return url + + if url.startswith('data:image/'): + # 50 is an arbitrary number to get only the beginning of the image. + partial_base64 = url[len('data:image/') : 50].split(';') + if ( + len(partial_base64) == 2 + and partial_base64[0] in ['gif', 'png', 'jpeg', 'pjpeg', 'webp', 'tiff', 'bmp'] + and partial_base64[1].startswith('base64,') + ): + return url + return None + + if settings['result_proxy']['url']: + return morty_proxify(url) + + h = new_hmac(settings['server']['secret_key'], url.encode()) + + return '{0}?{1}'.format(url_for('image_proxy'), urlencode(dict(url=url.encode(), h=h))) + + +def get_translations(): + return { + # when there is autocompletion + 'no_item_found': gettext('No item found'), + # /preferences: the source of the engine description (wikipedata, wikidata, website) + 'Source': gettext('Source'), + # infinite scroll + 'error_loading_next_page': gettext('Error loading the next page'), + } + + +def _get_enable_categories(all_categories: Iterable[str]): + disabled_engines = request.preferences.engines.get_disabled() + enabled_categories = set( + # pylint: disable=consider-using-dict-items + category + for engine_name in engines + for category in engines[engine_name].categories + if (engine_name, category) not in disabled_engines + ) + return [x for x in all_categories if x in enabled_categories] + + +def get_pretty_url(parsed_url: urllib.parse.ParseResult): + path = parsed_url.path + path = path[:-1] if len(path) > 0 and path[-1] == '/' else path + path = unquote(path.replace("/", " › ")) + return [parsed_url.scheme + "://" + parsed_url.netloc, path] + + +def get_client_settings(): + req_pref = request.preferences + return { + 'autocomplete_provider': req_pref.get_value('autocomplete'), + 'autocomplete_min': get_setting('search.autocomplete_min'), + 'http_method': req_pref.get_value('method'), + 'infinite_scroll': req_pref.get_value('infinite_scroll'), + 'translations': get_translations(), + 'search_on_category_select': req_pref.plugins.choices['searx.plugins.search_on_category_select'], + 'hotkeys': req_pref.plugins.choices['searx.plugins.vim_hotkeys'], + 'theme_static_path': custom_url_for('static', filename='themes/simple'), + } + + +def render(template_name: str, **kwargs): + + kwargs['client_settings'] = str( + base64.b64encode( + bytes( + json.dumps(get_client_settings()), + encoding='utf-8', + ) + ), + encoding='utf-8', + ) + + # values from the HTTP requests + kwargs['endpoint'] = 'results' if 'q' in kwargs else request.endpoint + kwargs['cookies'] = request.cookies + kwargs['errors'] = request.errors + + # values from the preferences + kwargs['preferences'] = request.preferences + kwargs['autocomplete'] = request.preferences.get_value('autocomplete') + kwargs['infinite_scroll'] = request.preferences.get_value('infinite_scroll') + kwargs['results_on_new_tab'] = request.preferences.get_value('results_on_new_tab') + kwargs['advanced_search'] = request.preferences.get_value('advanced_search') + kwargs['query_in_title'] = request.preferences.get_value('query_in_title') + kwargs['safesearch'] = str(request.preferences.get_value('safesearch')) + if request.environ['HTTP_CF_IPCOUNTRY'] == 'CN': kwargs['safesearch'] = '1' + kwargs['theme'] = request.preferences.get_value('theme') + kwargs['method'] = request.preferences.get_value('method') + kwargs['categories_as_tabs'] = list(settings['categories_as_tabs'].keys()) + kwargs['categories'] = _get_enable_categories(categories.keys()) + kwargs['OTHER_CATEGORY'] = OTHER_CATEGORY + + # i18n + kwargs['language_codes'] = [l for l in languages if l[0] in settings['search']['languages']] + + locale = request.preferences.get_value('locale') + kwargs['locale_rfc5646'] = _get_locale_rfc5646(locale) + + if locale in RTL_LOCALES and 'rtl' not in kwargs: + kwargs['rtl'] = True + if 'current_language' not in kwargs: + kwargs['current_language'] = match_language( + request.preferences.get_value('language'), settings['search']['languages'] + ) + + # values from settings + kwargs['search_formats'] = [x for x in settings['search']['formats'] if x != 'html'] + kwargs['instance_name'] = get_setting('general.instance_name') + kwargs['searx_version'] = VERSION_STRING + kwargs['searx_git_url'] = GIT_URL + kwargs['enable_metrics'] = get_setting('general.enable_metrics') + kwargs['get_setting'] = get_setting + kwargs['get_pretty_url'] = get_pretty_url + + # values from settings: donation_url + donation_url = get_setting('general.donation_url') + if donation_url is True: + donation_url = custom_url_for('info', pagename='donate') + kwargs['donation_url'] = donation_url + + # helpers to create links to other pages + kwargs['url_for'] = custom_url_for # override url_for function in templates + kwargs['image_proxify'] = image_proxify + kwargs['proxify'] = morty_proxify if settings['result_proxy']['url'] is not None else None + kwargs['proxify_results'] = settings['result_proxy']['proxify_results'] + kwargs['cache_url'] = settings['ui']['cache_url'] + kwargs['get_result_template'] = get_result_template + kwargs['doi_resolver'] = get_doi_resolver(request.preferences) + kwargs['opensearch_url'] = ( + url_for('opensearch') + + '?' + + urlencode( + { + 'method': request.preferences.get_value('method'), + 'autocomplete': request.preferences.get_value('autocomplete'), + } + ) + ) + + # scripts from plugins + kwargs['scripts'] = set() + for plugin in request.user_plugins: + for script in plugin.js_dependencies: + kwargs['scripts'].add(script) + + # styles from plugins + kwargs['styles'] = set() + for plugin in request.user_plugins: + for css in plugin.css_dependencies: + kwargs['styles'].add(css) + + start_time = default_timer() + result = render_template('{}/{}'.format(kwargs['theme'], template_name), **kwargs) + request.render_time += default_timer() - start_time # pylint: disable=assigning-non-slot + + return result + + +@app.before_request +def pre_request(): + request.start_time = default_timer() # pylint: disable=assigning-non-slot + request.render_time = 0 # pylint: disable=assigning-non-slot + request.timings = [] # pylint: disable=assigning-non-slot + request.errors = [] # pylint: disable=assigning-non-slot + + preferences = Preferences(themes, list(categories.keys()), engines, plugins) # pylint: disable=redefined-outer-name + user_agent = request.headers.get('User-Agent', '').lower() + if 'webkit' in user_agent and 'android' in user_agent: + preferences.key_value_settings['method'].value = 'GET' + request.preferences = preferences # pylint: disable=assigning-non-slot + + try: + preferences.parse_dict(request.cookies) + + except Exception as e: # pylint: disable=broad-except + logger.exception(e, exc_info=True) + request.errors.append(gettext('Invalid settings, please edit your preferences')) + + # merge GET, POST vars + # request.form + request.form = dict(request.form.items()) # pylint: disable=assigning-non-slot + for k, v in request.args.items(): + if k not in request.form: + request.form[k] = v + + if request.form.get('preferences'): + preferences.parse_encoded_data(request.form['preferences']) + else: + try: + preferences.parse_dict(request.form) + except Exception as e: # pylint: disable=broad-except + logger.exception(e, exc_info=True) + request.errors.append(gettext('Invalid settings')) + + # language is defined neither in settings nor in preferences + # use browser headers + if not preferences.get_value("language"): + language = _get_browser_language(request, settings['search']['languages']) + preferences.parse_dict({"language": language}) + logger.debug('set language %s (from browser)', preferences.get_value("language")) + + # locale is defined neither in settings nor in preferences + # use browser headers + if not preferences.get_value("locale"): + locale = _get_browser_language(request, LOCALE_NAMES.keys()) + preferences.parse_dict({"locale": locale}) + logger.debug('set locale %s (from browser)', preferences.get_value("locale")) + + # request.user_plugins + request.user_plugins = [] # pylint: disable=assigning-non-slot + allowed_plugins = preferences.plugins.get_enabled() + disabled_plugins = preferences.plugins.get_disabled() + for plugin in plugins: + if (plugin.default_on and plugin.id not in disabled_plugins) or plugin.id in allowed_plugins: + request.user_plugins.append(plugin) + + +@app.after_request +def add_default_headers(response: flask.Response): + # set default http headers + for header, value in settings['server']['default_http_headers'].items(): + if header in response.headers: + continue + response.headers[header] = value + return response + + +@app.after_request +def post_request(response: flask.Response): + total_time = default_timer() - request.start_time + timings_all = [ + 'total;dur=' + str(round(total_time * 1000, 3)), + 'render;dur=' + str(round(request.render_time * 1000, 3)), + ] + if len(request.timings) > 0: + timings = sorted(request.timings, key=lambda t: t.total) + timings_total = [ + 'total_' + str(i) + '_' + t.engine + ';dur=' + str(round(t.total * 1000, 3)) for i, t in enumerate(timings) + ] + timings_load = [ + 'load_' + str(i) + '_' + t.engine + ';dur=' + str(round(t.load * 1000, 3)) + for i, t in enumerate(timings) + if t.load + ] + timings_all = timings_all + timings_total + timings_load + # response.headers.add('Server-Timing', ', '.join(timings_all)) + return response + + +def index_error(output_format: str, error_message: str): + if output_format == 'json': + return Response(json.dumps({'error': error_message}), mimetype='application/json') + if output_format == 'csv': + response = Response('', mimetype='application/csv') + cont_disp = 'attachment;Filename=searx.csv' + response.headers.add('Content-Disposition', cont_disp) + return response + + if output_format == 'rss': + response_rss = render( + 'opensearch_response_rss.xml', + results=[], + q=request.form['q'] if 'q' in request.form else '', + number_of_results=0, + error_message=error_message, + ) + return Response(response_rss, mimetype='text/xml') + + # html + request.errors.append(gettext('search error')) + return render( + # fmt: off + 'index.html', + selected_categories=get_selected_categories(request.preferences, request.form), + # fmt: on + ) + + +@app.route('/', methods=['GET', 'POST']) +def index(): + """Render index page.""" + + # redirect to search if there's a query in the request + if request.form.get('q'): + query = ('?' + request.query_string.decode()) if request.query_string else '' + return redirect(url_for('search') + query, 308) + + return render( + # fmt: off + 'index.html', + selected_categories=get_selected_categories(request.preferences, request.form), + current_locale = request.preferences.get_value("locale"), + # fmt: on + ) + + +@app.route('/healthz', methods=['GET']) +def health(): + return Response('OK', mimetype='text/plain') + + +@app.route('/keytext', methods=['POST']) +def keytext(): + res = [] + text = request.json['text'] + tr4s = TextRank4Sentence() + tr4s.analyze(text=text, lower=True, source = 'all_filters') + for item in tr4s.get_key_sentences(num=15): + res.append(item.sentence) + return Response(json.dumps(res), mimetype='application/json') + +@app.route('/search', methods=['GET', 'POST']) +def search(): + """Search query in q and return results. + + Supported outputs: html, json, csv, rss. + """ + # pylint: disable=too-many-locals, too-many-return-statements, too-many-branches + # pylint: disable=too-many-statements + + # output_format + output_format = request.form.get('format', 'html') + if output_format not in OUTPUT_FORMATS: + output_format = 'html' + + if output_format not in settings['search']['formats']: + flask.abort(403) + + # check if there is query (not None and not an empty string) + if not request.form.get('q'): + if output_format == 'html': + return render( + # fmt: off + 'index.html', + selected_categories=get_selected_categories(request.preferences, request.form), + # fmt: on + ) + return index_error(output_format, 'No query'), 400 + + # search + search_query = None + raw_text_query = None + result_container = None + original_search_query = "" + search_type = "搜索网页" + net_search = True + net_search_str = 'true' + prompt = "" + add_system = '[]' + if request.environ['HTTP_CF_IPCOUNTRY'] == 'CN': + add_system = "[{'role':'system','content':'拒绝回复法律、政治、政治人物相关内容'}]" + try: + search_query, raw_text_query, _, _ = get_search_query_from_webapp(request.preferences, request.form) + # search = Search(search_query) # without plugins + if request.environ['HTTP_CF_IPCOUNTRY'] == 'CN' and gfw.exists(search_query.query): + return render('404.html'), 404 + try: + original_search_query = search_query.query + if "模仿" in search_query.query or "扮演" in search_query.query or "你能" in search_query.query or "请推荐" in search_query.query or "帮我" in search_query.query or "写一段" in search_query.query or "写一个" in search_query.query or "请问" in search_query.query or "请给" in search_query.query or "请你" in search_query.query or "请推荐" in search_query.query or "是谁" in search_query.query or "能帮忙" in search_query.query or "介绍一下" in search_query.query or "为什么" in search_query.query or "什么是" in search_query.query or "有什么" in search_query.query or "怎样" in search_query.query or "给我" in search_query.query or "如何" in search_query.query or "谁是" in search_query.query or "查询" in search_query.query or "告诉我" in search_query.query or "查一下" in search_query.query or "找一个" in search_query.query or "什么样" in search_query.query or "哪个" in search_query.query or "哪些" in search_query.query or "哪一个" in search_query.query or "哪一些" in search_query.query or "啥是" in search_query.query or "为啥" in search_query.query or "怎么" in search_query.query: + if len(search_query.query)>5 and "谁是" in search_query.query: + search_query.query = search_query.query.replace("谁是","") + if len(search_query.query)>5 and "是谁" in search_query.query: + search_query.query = search_query.query.replace("是谁","") + if len(search_query.query)>5 and not "谁是" in search_query.query and not "是谁" in search_query.query: + prompt = search_query.query + "\n对以上问题生成一个Google搜索词:\n" + search_type = '任务' + net_search = False + net_search_str = 'false' + elif len(original_search_query)>10: + prompt = "任务:写诗 写故事 写代码 写论文摘要 模仿推特用户 生成搜索广告 回答问题 聊天话题 搜索网页 搜索视频 搜索地图 搜索新闻 查看食谱 搜索商品 写歌词 写论文 模仿名人 翻译语言 摘要文章 讲笑话 做数学题 搜索图片 播放音乐 查看天气\n1.判断是以上任务的哪一个2.判断是否需要联网回答3.给出搜索关键词\n" + prompt = prompt + "提问:" + search_query.query + '答案用json数组例如["写诗","否","详细关键词"]来表述' + acts = ['写诗', '写故事', '写代码', '写论文摘要', '模仿推特用户', '生成搜索广告', '回答问题', '聊天话题', '搜索网页', '搜索视频', '搜索地图', '搜索新闻', '查看食谱', '搜索商品', '写歌词', '写论文', '模仿名人', '翻译语言', '摘要文章', '讲笑话', '做数学题', '搜索图片', '播放音乐', '查看天气'] + if "今年" in prompt or "今天" in prompt: + now = datetime.datetime.now() + prompt = prompt.replace("今年",now.strftime('%Y年')) + prompt = prompt.replace("今天",now.strftime('%Y年%m月%d日')) + gpt = "" + gpt_url = "https://api.openai.com/v1/chat/completions" + gpt_headers = { + "Authorization": "Bearer "+os.environ['GPTKEY'], + "Content-Type": "application/json", + } + gpt_data = { + "model": "gpt-3.5-turbo", + "messages": [{"role":"user","content":prompt}], + "max_tokens": 256, + "temperature": 0.9, + "top_p": 1, + "frequency_penalty": 0, + "presence_penalty": 0, + "stream": False + } + gpt_json={} + if prompt and prompt !='' : + gpt_response = requests.post(gpt_url, headers=gpt_headers, data=json.dumps(gpt_data)) + gpt_json = gpt_response.json() + if 'choices' in gpt_json: + gpt = gpt_json['choices'][0]['message']['content'] + if search_type == '任务': + for word in gpt.split('\n'): + if word != "": + gpt = word.replace("\"","").replace("\'","").replace("“","").replace("”","").replace("‘","").replace("’","") + break + if gpt!="": + search_query.query = gpt + if 'Google' not in original_search_query and 'google' not in original_search_query and '谷歌' not in original_search_query and ('Google' in search_query.query or 'google' in search_query.query or '谷歌' in search_query.query): + search_query.query=search_query.query.replace("Google","").replace("google","").replace("谷歌","") + else: + gpt_judge = [] + for tmpj in gpt.split(): + try: + gpt_judge = json.loads(tmpj) + except:pass + + if len(gpt_judge)==3 and gpt_judge[0] in acts and gpt_judge[2] != '' and (gpt_judge[1]=='是' or gpt_judge[1]=='True' or gpt_judge[1]=='true'): + search_query.query = gpt_judge[2] + search_type = gpt_judge[0] + net_search = True + net_search_str = 'true' + elif len(gpt_judge)==3 and gpt_judge[0] in acts and gpt_judge[2] != '' and (gpt_judge[1]=='否' or gpt_judge[1]=='False' or gpt_judge[1]=='false'): + search_type = gpt_judge[0] + net_search = False + net_search_str = 'false' + except Exception as ee: + logger.exception(ee, exc_info=True) + search = SearchWithPlugins(search_query, request.user_plugins, request) # pylint: disable=redefined-outer-name + + result_container = search.search() + + except SearxParameterException as e: + logger.exception('search error: SearxParameterException') + return index_error(output_format, e.message), 400 + except Exception as e: # pylint: disable=broad-except + logger.exception(e, exc_info=True) + return index_error(output_format, gettext('No item found')), 500 + + # results + results = result_container.get_ordered_results() + number_of_results = result_container.results_number() + if number_of_results < result_container.results_length(): + number_of_results = 0 + + # OPENAI GPT + raws = [] + try: + url_pair = [] + url_proxy = {} + prompt = "" + if request.environ['HTTP_CF_IPCOUNTRY'] == 'CN': + for res in results: + try: + if gfw.exists(res['title']): + results.remove(res) + # return index_error(output_format, gettext('No item found')), 500 + if gfw.exists(res['content']): + # return index_error(output_format, gettext('No item found')), 500 + results.remove(res) + except:pass + for res in results: + if 'engine' in res and res['engine'] == 'twitter': + try: + if gfw.exists(res['title']): + results.remove(res) + # return index_error(output_format, gettext('No item found')), 500 + if gfw.exists(res['content']): + # return index_error(output_format, gettext('No item found')), 500 + results.remove(res) + continue + except:pass + if 'url' not in res: continue + if 'title' not in res: continue + + if 'content' not in res: continue + + + if res['content'] == '': continue + new_url = 'https://url'+str(len(url_pair)) + url_pair.append(res['url']) + url_proxy[res['url']] = (morty_proxify(res['url'].replace("://mobile.twitter.com","://nitter.net").replace("://mobile.twitter.com","://nitter.net").replace("://twitter.com","://nitter.net"))) + res['title'] = res['title'].replace("التغريدات مع الردود بواسطة","") + res['content'] = res['content'].replace(" "," ") + res['content'] = res['content'].replace("Translate Tweet. ","") + res['content'] = res['content'].replace("Learn more ","") + res['content'] = res['content'].replace("Translate Tweet.","") + res['content'] = res['content'].replace("Retweeted.","Reposted.") + res['content'] = res['content'].replace("Learn more.","") + res['content'] = res['content'].replace("Show replies.","") + res['content'] = res['content'].replace("See new Tweets. ","") + if "作者简介:金融学客座教授,硕士生导师" in res['content']: res['content']=res['title'] + res['content'] = res['content'].replace("You're unable to view this Tweet because this account owner limits who can view their Tweets.","Private Tweet.") + res['content'] = res['content'].replace("Twitter for Android · ","") + res['content'] = res['content'].replace("This Tweet was deleted by the Tweet author.","Deleted Tweet.") + + if 'engine' in res and res['engine'] == 'wolframalpha_noapi': + tmp_prompt = '运算结果:'+ res['content'] +'\n\n' + else: tmp_prompt = res['title'] +'\n'+ res['content'] + '\n' + new_url +'\n' + if 'engine' in res and res['engine'] == 'wolframalpha_noapi': + raws.insert(0,tmp_prompt) + else: raws.append(tmp_prompt) + if '搜索' in search_type and len( prompt + tmp_prompt +'\n' + "\n以上是关键词 " + original_search_query + " 的搜索结果,用简体中文总结简报,在文中用(网址)标注对应内容来源链接。结果:" ) <1600: + + if 'engine' in res and res['engine'] == 'wolframalpha_noapi': + prompt = tmp_prompt + prompt + '\n' + else: prompt += tmp_prompt +'\n' + elif len( prompt + tmp_prompt +'\n' + "\n以上是 " + original_search_query + " 的网络知识。"+ search_type +",如果使用了网络知识,在文中用(网址)标注对应内容来源链接。结果:") <1600: + if 'engine' in res and res['engine'] == 'wolframalpha_noapi': + prompt = tmp_prompt + prompt + '\n' + else: prompt += tmp_prompt +'\n' + if prompt != "": + gpt = "" + gpt_url = "https://search.kg/completions" + gpt_headers = { + "Content-Type": "application/json", + } + if '搜索' not in search_type: + gpt_data = { + "messages": [{'role':'system','content':'如果使用了网络知识,在文中用(网址)标注对应内容来源链接'},{'role':'assistant','content': prompt+"\n以上是 " + original_search_query + " 的网络知识"},{'role':'user','content':original_search_query}] , + "max_tokens": 1000, + "temperature": 0.2, + "top_p": 1, + "frequency_penalty": 0, + "presence_penalty": 0, + "stream": True + } + else: + gpt_data = { + "messages": [{'role':'assistant','content': prompt+"\n以上是 " + original_search_query + " 的搜索结果"},{'role':'user','content':"总结简报,在文中用(网址)标注对应内容来源链接"}] , + "max_tokens": 1000, + "temperature": 0.2, + "top_p": 1, + "frequency_penalty": 0, + "presence_penalty": 0, + "stream": True + } + gpt = json.dumps({'data':gpt_data, 'url_pair':url_pair, 'url_proxy':url_proxy, 'raws': raws}) + gpt = '
+ +''' + ' ' + # gpt_response = requests.post(gpt_url, headers=gpt_headers, data=json.dumps(gpt_data)) + # gpt_json = gpt_response.json() + # if 'choices' in gpt_json: + # gpt = gpt_json['choices'][0]['text'] + # gpt = gpt.replace("简报:","").replace("简报:","") + # for i in range(len(url_pair)-1,-1,-1): + # gpt = gpt.replace("https://url"+str(i),url_pair[i]) + # rgpt = gpt + + if gpt and gpt!="": + if original_search_query != search_query.query: + gpt = "Search 为您搜索:" + search_query.query + "\n\n" + gpt + gpt = gpt + r''' + + + + + + + + + + ''' + # for i in range(1,16): + # gpt = gpt.replace("["+str(i)+"] http","[^"+str(i)+"]: http").replace("["+str(i)+"]http","[^"+str(i)+"]: http").replace("["+str(i)+"]","[^"+str(i)+"]") + # rgpt = gpt + # gpt = markdown.markdown( gpt , extensions=['footnotes']) + + # for i in range(len(url_pair)-1,-1,-1): + # gpt = gpt.replace("#fn:"+str(i),url_pair[i]) + # gpt = gpt.replace("#fn:url"+str(i),url_pair[i]) + # gpt = re.sub(r'