Merge pull request #100 from return42/webapp-pylint

[pylint] webapp.py
This commit is contained in:
Alexandre Flament 2021-05-27 17:04:32 +02:00 committed by GitHub
commit b48b4c93d5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 212 additions and 154 deletions

View file

@ -1,30 +1,118 @@
#!/usr/bin/env python
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=missing-function-docstring
"""WebbApp
'''
searx is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
searx is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with searx. If not, see < http://www.gnu.org/licenses/ >.
(C) 2013- by Adam Tauber, <asciimoo@gmail.com>
'''
"""
import hashlib
import hmac
import json
import os
import sys
if sys.version_info[0] < 3:
print('\033[1;31m Python2 is no longer supported\033[0m')
exit(1)
if __name__ == '__main__':
from os.path import realpath, dirname
sys.path.append(realpath(dirname(realpath(__file__)) + '/../'))
from datetime import datetime, timedelta
from timeit import default_timer
from html import escape
from io import StringIO
import urllib
from urllib.parse import (
urlencode,
urlparse,
)
import httpx
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters import HtmlFormatter # pylint: disable=no-name-in-module
from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.serving import WSGIRequestHandler
from flask import (
Flask,
request,
render_template,
url_for,
Response,
make_response,
redirect,
send_from_directory,
)
from flask.ctx import has_request_context
from flask.json import jsonify
from babel.support import Translations
import flask_babel
from flask_babel import (
Babel,
gettext,
format_date,
format_decimal,
)
from searx import logger
from searx import brand, static_path
from searx import (
settings,
searx_dir,
searx_debug,
)
from searx.exceptions import SearxParameterException
from searx.engines import (
categories,
engines,
engine_shortcuts,
)
from searx.webutils import (
UnicodeWriter,
highlight_content,
get_resources_directory,
get_static_files,
get_result_templates,
get_themes,
prettify_url,
new_hmac,
is_flask_run_cmdline,
)
from searx.webadapter import (
get_search_query_from_webapp,
get_selected_categories,
)
from searx.utils import (
html_to_text,
gen_useragent,
dict_subset,
match_language,
)
from searx.version import VERSION_STRING
from searx.query import RawTextQuery
from searx.plugins import plugins
from searx.plugins.oa_doi_rewrite import get_doi_resolver
from searx.preferences import (
Preferences,
ValidationException,
LANGUAGE_CODES,
)
from searx.answerers import answerers
from searx.answerers import ask
from searx.metrics import (
get_engines_stats,
get_engine_errors,
get_reliabilities,
histogram,
counter,
)
# renaming names from searx imports ...
from searx.autocomplete import search_autocomplete, backends as autocomplete_backends
from searx.languages import language_codes as languages
from searx.search import SearchWithPlugins, initialize as search_initialize
from searx.network import stream as http_stream
from searx.search.checker import get_result as checker_get_result
# set Unix thread name
try:
@ -36,74 +124,24 @@ else:
old_thread_init = threading.Thread.__init__
def new_thread_init(self, *args, **kwargs):
# pylint: disable=protected-access, disable=c-extension-no-member
old_thread_init(self, *args, **kwargs)
setproctitle.setthreadtitle(self._name)
threading.Thread.__init__ = new_thread_init
import hashlib
import hmac
import json
import os
if sys.version_info[0] < 3:
print('\033[1;31m Python2 is no longer supported\033[0m')
sys.exit(1)
import httpx
from searx import logger
logger = logger.getChild('webapp')
from datetime import datetime, timedelta
from timeit import default_timer
from html import escape
from io import StringIO
import urllib
from urllib.parse import urlencode, urlparse
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters import HtmlFormatter # pylint: disable=no-name-in-module
from werkzeug.middleware.proxy_fix import ProxyFix
from flask import (
Flask, request, render_template, url_for, Response, make_response,
redirect, send_from_directory
)
from babel.support import Translations
import flask_babel
from flask_babel import Babel, gettext, format_date, format_decimal
from flask.ctx import has_request_context
from flask.json import jsonify
from searx import brand, static_path
from searx import settings, searx_dir, searx_debug
from searx.exceptions import SearxParameterException
from searx.engines import categories, engines, engine_shortcuts
from searx.webutils import (
UnicodeWriter, highlight_content, get_resources_directory,
get_static_files, get_result_templates, get_themes,
prettify_url, new_hmac, is_flask_run_cmdline
)
from searx.webadapter import get_search_query_from_webapp, get_selected_categories
from searx.utils import html_to_text, gen_useragent, dict_subset, match_language
from searx.version import VERSION_STRING
from searx.languages import language_codes as languages
from searx.search import SearchWithPlugins, initialize as search_initialize
from searx.search.checker import get_result as checker_get_result
from searx.query import RawTextQuery
from searx.autocomplete import search_autocomplete, backends as autocomplete_backends
from searx.plugins import plugins
from searx.plugins.oa_doi_rewrite import get_doi_resolver
from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES
from searx.answerers import answerers
from searx.network import stream as http_stream
from searx.answerers import ask
from searx.metrics import get_engines_stats, get_engine_errors, get_reliabilities, histogram, counter
# serve pages with HTTP/1.1
from werkzeug.serving import WSGIRequestHandler
WSGIRequestHandler.protocol_version = "HTTP/{}".format(settings['server'].get('http_protocol_version', '1.0'))
# check secret_key
if not searx_debug and settings['server']['secret_key'] == 'ultrasecretkey':
logger.error('server.secret_key is not changed. Please use something else instead of ultrasecretkey.')
exit(1)
sys.exit(1)
# about static
static_path = get_resources_directory(searx_dir, 'static', settings['ui']['static_path'])
@ -123,6 +161,14 @@ for indice, theme in enumerate(themes):
for (dirpath, dirnames, filenames) in os.walk(theme_img_path):
global_favicons[indice].extend(filenames)
STATS_SORT_PARAMETERS = {
'name': (False, 'name', ''),
'score': (True, 'score', 0),
'result_count': (True, 'result_count', 0),
'time': (False, 'total', 0),
'reliability': (False, 'reliability', 100),
}
# Flask app
app = Flask(
__name__,
@ -217,8 +263,8 @@ def _get_translations():
flask_babel.get_translations = _get_translations
def _get_browser_or_settings_language(request, lang_list):
for lang in request.headers.get("Accept-Language", "en").split(","):
def _get_browser_or_settings_language(req, lang_list):
for lang in req.headers.get("Accept-Language", "en").split(","):
if ';' in lang:
lang = lang.split(';')[0]
if '-' in lang:
@ -269,9 +315,10 @@ def code_highlighter(codelines, language=None):
try:
# find lexer by programing language
lexer = get_lexer_by_name(language, stripall=True)
except:
except Exception as e: # pylint: disable=broad-except
logger.exception(e, exc_info=True)
# if lexer is not found, using default one
logger.debug('highlighter cannot find lexer for {0}'.format(language))
lexer = get_lexer_by_name('text', stripall=True)
html_code = ''
@ -336,8 +383,8 @@ def get_current_theme_name(override=None):
return theme_name
def get_result_template(theme, template_name):
themed_path = theme + '/result_templates/' + template_name
def get_result_template(theme_name, template_name):
themed_path = theme_name + '/result_templates/' + template_name
if themed_path in result_templates:
return themed_path
return 'result_templates/' + template_name
@ -386,8 +433,7 @@ def image_proxify(url):
and partial_base64[0] in ['gif', 'png', 'jpeg', 'pjpeg', 'webp', 'tiff', 'bmp']\
and partial_base64[1].startswith('base64,'):
return url
else:
return None
return None
if settings.get('result_proxy'):
return proxify(url)
@ -506,14 +552,17 @@ def pre_request():
request.timings = [] # pylint: disable=assigning-non-slot
request.errors = [] # pylint: disable=assigning-non-slot
preferences = Preferences(themes, list(categories.keys()), engines, plugins)
preferences = Preferences(themes, list(categories.keys()), engines, plugins) # pylint: disable=redefined-outer-name
user_agent = request.headers.get('User-Agent', '').lower()
if 'webkit' in user_agent and 'android' in user_agent:
preferences.key_value_settings['method'].value = 'GET'
request.preferences = preferences # pylint: disable=assigning-non-slot
try:
preferences.parse_dict(request.cookies)
except:
except Exception as e: # pylint: disable=broad-except
logger.exception(e, exc_info=True)
request.errors.append(gettext('Invalid settings, please edit your preferences'))
# merge GET, POST vars
@ -528,8 +577,8 @@ def pre_request():
else:
try:
preferences.parse_dict(request.form)
except Exception:
logger.exception('invalid settings')
except Exception as e: # pylint: disable=broad-except
logger.exception(e, exc_info=True)
request.errors.append(gettext('Invalid settings'))
# init search language and locale
@ -578,12 +627,13 @@ def index_error(output_format, error_message):
if output_format == 'json':
return Response(json.dumps({'error': error_message}),
mimetype='application/json')
elif output_format == 'csv':
if output_format == 'csv':
response = Response('', mimetype='application/csv')
cont_disp = 'attachment;Filename=searx.csv'
response.headers.add('Content-Disposition', cont_disp)
return response
elif output_format == 'rss':
if output_format == 'rss':
response_rss = render(
'opensearch_response_rss.xml',
results=[],
@ -594,13 +644,13 @@ def index_error(output_format, error_message):
override_theme='__common__',
)
return Response(response_rss, mimetype='text/xml')
else:
# html
request.errors.append(gettext('search error'))
return render(
'index.html',
selected_categories=get_selected_categories(request.preferences, request.form),
)
# html
request.errors.append(gettext('search error'))
return render(
'index.html',
selected_categories=get_selected_categories(request.preferences, request.form),
)
@app.route('/', methods=['GET', 'POST'])
@ -628,6 +678,8 @@ def search():
Supported outputs: html, json, csv, rss.
"""
# pylint: disable=too-many-locals, too-many-return-statements, too-many-branches
# pylint: disable=too-many-statements
# output_format
output_format = request.form.get('format', 'html')
@ -642,8 +694,7 @@ def search():
advanced_search=request.preferences.get_value('advanced_search'),
selected_categories=get_selected_categories(request.preferences, request.form),
)
else:
return index_error(output_format, 'No query'), 400
return index_error(output_format, 'No query'), 400
# search
search_query = None
@ -652,15 +703,15 @@ def search():
try:
search_query, raw_text_query, _, _ = get_search_query_from_webapp(request.preferences, request.form)
# search = Search(search_query) # without plugins
search = SearchWithPlugins(search_query, request.user_plugins, request)
search = SearchWithPlugins(search_query, request.user_plugins, request) # pylint: disable=redefined-outer-name
result_container = search.search()
except SearxParameterException as e:
logger.exception('search error: SearxParameterException')
return index_error(output_format, e.message), 400
except Exception as e:
logger.exception('search error')
except Exception as e: # pylint: disable=broad-except
logger.exception(e, exc_info=True)
return index_error(output_format, gettext('search error')), 500
# results
@ -692,7 +743,7 @@ def search():
if 'url' in result:
result['pretty_url'] = prettify_url(result['url'])
# TODO, check if timezone is calculated right
# TODO, check if timezone is calculated right # pylint: disable=fixme
if result.get('publishedDate'): # do not try to get a date from an empty string or a None type
try: # test if publishedDate >= 1900 (datetime module bug)
result['pubdate'] = result['publishedDate'].strftime('%Y-%m-%d %H:%M:%S%z')
@ -706,22 +757,32 @@ def search():
if hours == 0:
result['publishedDate'] = gettext('{minutes} minute(s) ago').format(minutes=minutes)
else:
result['publishedDate'] = gettext('{hours} hour(s), {minutes} minute(s) ago').format(hours=hours, minutes=minutes) # noqa
result['publishedDate'] = gettext(
'{hours} hour(s), {minutes} minute(s) ago').format(
hours=hours, minutes=minutes
)
else:
result['publishedDate'] = format_date(result['publishedDate'])
if output_format == 'json':
return Response(json.dumps({'query': search_query.query,
'number_of_results': number_of_results,
'results': results,
'answers': list(result_container.answers),
'corrections': list(result_container.corrections),
'infoboxes': result_container.infoboxes,
'suggestions': list(result_container.suggestions),
'unresponsive_engines': __get_translated_errors(result_container.unresponsive_engines)}, # noqa
default=lambda item: list(item) if isinstance(item, set) else item),
mimetype='application/json')
elif output_format == 'csv':
return Response(
json.dumps(
{
'query': search_query.query,
'number_of_results': number_of_results,
'results': results,
'answers': list(result_container.answers),
'corrections': list(result_container.corrections),
'infoboxes': result_container.infoboxes,
'suggestions': list(result_container.suggestions),
'unresponsive_engines': __get_translated_errors(result_container.unresponsive_engines)
},
default = lambda item: list(item) if isinstance(item, set) else item
),
mimetype='application/json'
)
if output_format == 'csv':
csv = UnicodeWriter(StringIO())
keys = ('title', 'url', 'content', 'host', 'engine', 'score', 'type')
csv.writerow(keys)
@ -744,7 +805,7 @@ def search():
response.headers.add('Content-Disposition', cont_disp)
return response
elif output_format == 'rss':
if output_format == 'rss':
response_rss = render(
'opensearch_response_rss.xml',
results=results,
@ -882,6 +943,9 @@ def autocompleter():
def preferences():
"""Render preferences page && save user preferences"""
# pylint: disable=too-many-locals, too-many-return-statements, too-many-branches
# pylint: disable=too-many-statements
# save preferences
if request.method == 'POST':
resp = make_response(redirect(url_for('index', _external=True)))
@ -893,7 +957,7 @@ def preferences():
return request.preferences.save(resp)
# render preferences
image_proxy = request.preferences.get_value('image_proxy')
image_proxy = request.preferences.get_value('image_proxy') # pylint: disable=redefined-outer-name
disabled_engines = request.preferences.engines.get_disabled()
allowed_plugins = request.preferences.plugins.get_enabled()
@ -908,7 +972,7 @@ def preferences():
# get first element [0], the engine time,
# and then the second element [1] : the time (the first one is the label)
stats = {}
stats = {} # pylint: disable=redefined-outer-name
max_rate95 = 0
for _, e in filtered_engines.items():
h = histogram('engine', e.name, 'time', 'total')
@ -1025,7 +1089,7 @@ def preferences():
preferences=True)
def _is_selected_language_supported(engine, preferences):
def _is_selected_language_supported(engine, preferences): # pylint: disable=redefined-outer-name
language = preferences.get_value('language')
return (language == 'all'
or match_language(language,
@ -1035,6 +1099,8 @@ def _is_selected_language_supported(engine, preferences):
@app.route('/image_proxy', methods=['GET'])
def image_proxy():
# pylint: disable=too-many-return-statements
url = request.args.get('url')
if not url:
@ -1113,18 +1179,10 @@ def stats():
engine_stats = get_engines_stats(filtered_engines)
engine_reliabilities = get_reliabilities(filtered_engines, checker_results)
SORT_PARAMETERS = {
'name': (False, 'name', ''),
'score': (True, 'score', 0),
'result_count': (True, 'result_count', 0),
'time': (False, 'total', 0),
'reliability': (False, 'reliability', 100),
}
if sort_order not in SORT_PARAMETERS:
if sort_order not in STATS_SORT_PARAMETERS:
sort_order = 'name'
reverse, key_name, default_value = SORT_PARAMETERS[sort_order]
reverse, key_name, default_value = STATS_SORT_PARAMETERS[sort_order]
def get_key(engine_stat):
reliability = engine_reliabilities.get(engine_stat['name']).get('reliablity', 0)
@ -1197,14 +1255,16 @@ def opensearch():
@app.route('/favicon.ico')
def favicon():
return send_from_directory(os.path.join(app.root_path,
static_path,
'themes',
get_current_theme_name(),
'img'),
'favicon.png',
mimetype='image/vnd.microsoft.icon')
return send_from_directory(
os.path.join(
app.root_path,
static_path,
'themes',
get_current_theme_name(),
'img'),
'favicon.png',
mimetype = 'image/vnd.microsoft.icon'
)
@app.route('/clear_cookies')
def clear_cookies():
@ -1259,13 +1319,13 @@ def config():
'GIT_URL': brand.GIT_URL,
'DOCS_URL': brand.DOCS_URL
},
'doi_resolvers': [r for r in settings['doi_resolvers']],
'doi_resolvers': list(settings['doi_resolvers'].keys()),
'default_doi_resolver': settings['default_doi_resolver'],
})
@app.errorhandler(404)
def page_not_found(e):
def page_not_found(_e):
return render('404.html'), 404
@ -1297,12 +1357,13 @@ class ReverseProxyPathFix:
proxy_set_header X-Script-Name /myprefix;
}
:param app: the WSGI application
:param wsgi_app: the WSGI application
'''
# pylint: disable=too-few-public-methods
def __init__(self, app):
def __init__(self, wsgi_app):
self.app = app
self.wsgi_app = wsgi_app
self.script_name = None
self.scheme = None
self.server = None
@ -1336,7 +1397,7 @@ class ReverseProxyPathFix:
server = self.server or environ.get('HTTP_X_FORWARDED_HOST', '')
if server:
environ['HTTP_HOST'] = server
return self.app(environ, start_response)
return self.wsgi_app(environ, start_response)
application = app