[enh] record details exception per engine

add an new API /stats/errors
This commit is contained in:
Alexandre Flament 2020-11-26 15:12:11 +01:00
parent 6b5a578822
commit 1d0c368746
10 changed files with 495 additions and 39 deletions

View File

@ -134,16 +134,18 @@ The function ``def request(query, params):`` always returns the ``params``
variable. Inside searx, the following paramters can be used to specify a search
request:
============ =========== =========================================================
================== =========== ========================================================================
argument type information
============ =========== =========================================================
================== =========== ========================================================================
url string requested url
method string HTTP request method
headers set HTTP header information
data set HTTP data information (parsed if ``method != 'GET'``)
cookies set HTTP cookies
verify boolean Performing SSL-Validity check
============ =========== =========================================================
max_redirects int maximum redirects, hard limit
soft_max_redirects int maximum redirects, soft limit. Record an error but don't stop the engine
================== =========== ========================================================================
example code

View File

@ -132,8 +132,9 @@ def load_engine(engine_data):
lambda: engine._fetch_supported_languages(get(engine.supported_languages_url)))
engine.stats = {
'sent_search_count': 0, # sent search
'search_count': 0, # succesful search
'result_count': 0,
'search_count': 0,
'engine_time': 0,
'engine_time_count': 0,
'score_count': 0,

View File

@ -34,8 +34,45 @@ class SearxParameterException(SearxException):
class SearxSettingsException(SearxException):
"""Error while loading the settings"""
def __init__(self, message, filename):
super().__init__(message)
self.message = message
self.filename = filename
class SearxEngineException(SearxException):
"""Error inside an engine"""
class SearxXPathSyntaxException(SearxEngineException):
"""Syntax error in a XPATH"""
def __init__(self, xpath_spec, message):
super().__init__(str(xpath_spec) + " " + message)
self.message = message
# str(xpath_spec) to deal with str and XPath instance
self.xpath_str = str(xpath_spec)
class SearxEngineResponseException(SearxEngineException):
"""Impossible to parse the result of an engine"""
class SearxEngineAPIException(SearxEngineResponseException):
"""The website has returned an application error"""
class SearxEngineCaptchaException(SearxEngineResponseException):
"""The website has returned a CAPTCHA"""
class SearxEngineXPathException(SearxEngineResponseException):
"""Error while getting the result of an XPath expression"""
def __init__(self, xpath_spec, message):
super().__init__(str(xpath_spec) + " " + message)
self.message = message
# str(xpath_spec) to deal with str and XPath instance
self.xpath_str = str(xpath_spec)

View File

View File

@ -0,0 +1,142 @@
import typing
import inspect
import logging
from json import JSONDecodeError
from urllib.parse import urlparse
from requests.exceptions import RequestException
from searx.exceptions import SearxXPathSyntaxException, SearxEngineXPathException
from searx import logger
logging.basicConfig(level=logging.INFO)
errors_per_engines = {}
class ErrorContext:
__slots__ = 'filename', 'function', 'line_no', 'code', 'exception_classname', 'log_message', 'log_parameters'
def __init__(self, filename, function, line_no, code, exception_classname, log_message, log_parameters):
self.filename = filename
self.function = function
self.line_no = line_no
self.code = code
self.exception_classname = exception_classname
self.log_message = log_message
self.log_parameters = log_parameters
def __eq__(self, o) -> bool:
if not isinstance(o, ErrorContext):
return False
return self.filename == o.filename and self.function == o.function and self.line_no == o.line_no\
and self.code == o.code and self.exception_classname == o.exception_classname\
and self.log_message == o.log_message and self.log_parameters == o.log_parameters
def __hash__(self):
return hash((self.filename, self.function, self.line_no, self.code, self.exception_classname, self.log_message,
self.log_parameters))
def __repr__(self):
return "ErrorContext({!r}, {!r}, {!r}, {!r}, {!r}, {!r})".\
format(self.filename, self.line_no, self.code, self.exception_classname, self.log_message,
self.log_parameters)
def add_error_context(engine_name: str, error_context: ErrorContext) -> None:
errors_for_engine = errors_per_engines.setdefault(engine_name, {})
errors_for_engine[error_context] = errors_for_engine.get(error_context, 0) + 1
logger.debug('⚠️ %s: %s', engine_name, str(error_context))
def get_trace(traces):
previous_trace = traces[-1]
for trace in reversed(traces):
if trace.filename.endswith('searx/search.py'):
if previous_trace.filename.endswith('searx/poolrequests.py'):
return trace
if previous_trace.filename.endswith('requests/models.py'):
return trace
return previous_trace
previous_trace = trace
return traces[-1]
def get_hostname(exc: RequestException) -> typing.Optional[None]:
url = exc.request.url
if url is None and exc.response is not None:
url = exc.response.url
return urlparse(url).netloc
def get_request_exception_messages(exc: RequestException)\
-> typing.Tuple[typing.Optional[str], typing.Optional[str], typing.Optional[str]]:
url = None
status_code = None
reason = None
hostname = None
if exc.request is not None:
url = exc.request.url
if url is None and exc.response is not None:
url = exc.response.url
if url is not None:
hostname = str(urlparse(url).netloc)
if exc.response is not None:
status_code = str(exc.response.status_code)
reason = exc.response.reason
return (status_code, reason, hostname)
def get_messages(exc, filename) -> typing.Tuple:
if isinstance(exc, JSONDecodeError):
return (exc.msg, )
if isinstance(exc, TypeError):
return (str(exc), )
if isinstance(exc, ValueError) and 'lxml' in filename:
return (str(exc), )
if isinstance(exc, RequestException):
return get_request_exception_messages(exc)
if isinstance(exc, SearxXPathSyntaxException):
return (exc.xpath_str, exc.message)
if isinstance(exc, SearxEngineXPathException):
return (exc.xpath_str, exc.message)
return ()
def get_exception_classname(exc: Exception) -> str:
exc_class = exc.__class__
exc_name = exc_class.__qualname__
exc_module = exc_class.__module__
if exc_module is None or exc_module == str.__class__.__module__:
return exc_name
return exc_module + '.' + exc_name
def get_error_context(framerecords, exception_classname, log_message, log_parameters) -> ErrorContext:
searx_frame = get_trace(framerecords)
filename = searx_frame.filename
function = searx_frame.function
line_no = searx_frame.lineno
code = searx_frame.code_context[0].strip()
del framerecords
return ErrorContext(filename, function, line_no, code, exception_classname, log_message, log_parameters)
def record_exception(engine_name: str, exc: Exception) -> None:
framerecords = inspect.trace()
try:
exception_classname = get_exception_classname(exc)
log_parameters = get_messages(exc, framerecords[-1][1])
error_context = get_error_context(framerecords, exception_classname, None, log_parameters)
add_error_context(engine_name, error_context)
finally:
del framerecords
def record_error(engine_name: str, log_message: str, log_parameters: typing.Optional[typing.Tuple] = None) -> None:
framerecords = list(reversed(inspect.stack()[1:]))
try:
error_context = get_error_context(framerecords, None, log_message, log_parameters or ())
add_error_context(engine_name, error_context)
finally:
del framerecords

View File

@ -4,6 +4,7 @@ from threading import RLock
from urllib.parse import urlparse, unquote
from searx import logger
from searx.engines import engines
from searx.metrology.error_recorder import record_error
CONTENT_LEN_IGNORED_CHARS_REGEX = re.compile(r'[,;:!?\./\\\\ ()-_]', re.M | re.U)
@ -161,6 +162,7 @@ class ResultContainer:
def extend(self, engine_name, results):
standard_result_count = 0
error_msgs = set()
for result in list(results):
result['engine'] = engine_name
if 'suggestion' in result:
@ -177,14 +179,21 @@ class ResultContainer:
# standard result (url, title, content)
if 'url' in result and not isinstance(result['url'], str):
logger.debug('result: invalid URL: %s', str(result))
error_msgs.add('invalid URL')
elif 'title' in result and not isinstance(result['title'], str):
logger.debug('result: invalid title: %s', str(result))
error_msgs.add('invalid title')
elif 'content' in result and not isinstance(result['content'], str):
logger.debug('result: invalid content: %s', str(result))
error_msgs.add('invalid content')
else:
self._merge_result(result, standard_result_count + 1)
standard_result_count += 1
if len(error_msgs) > 0:
for msg in error_msgs:
record_error(engine_name, 'some results are invalids: ' + msg)
if engine_name in engines:
with RLock():
engines[engine_name].stats['search_count'] += 1

View File

@ -20,6 +20,7 @@ import gc
import threading
from time import time
from uuid import uuid4
from urllib.parse import urlparse
from _thread import start_new_thread
import requests.exceptions
@ -31,6 +32,8 @@ from searx.utils import gen_useragent
from searx.results import ResultContainer
from searx import logger
from searx.plugins import plugins
from searx.exceptions import SearxEngineCaptchaException
from searx.metrology.error_recorder import record_exception, record_error
logger = logger.getChild('search')
@ -120,6 +123,14 @@ def send_http_request(engine, request_params):
if hasattr(engine, 'proxies'):
request_args['proxies'] = requests_lib.get_proxies(engine.proxies)
# max_redirects
max_redirects = request_params.get('max_redirects')
if max_redirects:
request_args['max_redirects'] = max_redirects
# soft_max_redirects
soft_max_redirects = request_params.get('soft_max_redirects', max_redirects or 0)
# specific type of request (GET or POST)
if request_params['method'] == 'GET':
req = requests_lib.get
@ -129,7 +140,23 @@ def send_http_request(engine, request_params):
request_args['data'] = request_params['data']
# send the request
return req(request_params['url'], **request_args)
response = req(request_params['url'], **request_args)
# check HTTP status
response.raise_for_status()
# check soft limit of the redirect count
if len(response.history) > soft_max_redirects:
# unexpected redirect : record an error
# but the engine might still return valid results.
status_code = str(response.status_code or '')
reason = response.reason or ''
hostname = str(urlparse(response.url or '').netloc)
record_error(engine.name,
'{} redirects, maximum: {}'.format(len(response.history), soft_max_redirects),
(status_code, reason, hostname))
return response
def search_one_http_request(engine, query, request_params):
@ -183,8 +210,9 @@ def search_one_http_request_safe(engine_name, query, request_params, result_cont
# update stats with the total HTTP time
engine.stats['page_load_time'] += page_load_time
engine.stats['page_load_count'] += 1
except Exception as e:
record_exception(engine_name, e)
# Timing
engine_time = time() - start_time
page_load_time = requests_lib.get_time_for_thread()
@ -195,23 +223,29 @@ def search_one_http_request_safe(engine_name, query, request_params, result_cont
engine.stats['errors'] += 1
if (issubclass(e.__class__, requests.exceptions.Timeout)):
result_container.add_unresponsive_engine(engine_name, 'timeout')
result_container.add_unresponsive_engine(engine_name, 'HTTP timeout')
# requests timeout (connect or read)
logger.error("engine {0} : HTTP requests timeout"
"(search duration : {1} s, timeout: {2} s) : {3}"
.format(engine_name, engine_time, timeout_limit, e.__class__.__name__))
requests_exception = True
elif (issubclass(e.__class__, requests.exceptions.RequestException)):
result_container.add_unresponsive_engine(engine_name, 'request exception')
result_container.add_unresponsive_engine(engine_name, 'HTTP error')
# other requests exception
logger.exception("engine {0} : requests exception"
"(search duration : {1} s, timeout: {2} s) : {3}"
.format(engine_name, engine_time, timeout_limit, e))
requests_exception = True
elif (issubclass(e.__class__, SearxEngineCaptchaException)):
result_container.add_unresponsive_engine(engine_name, 'CAPTCHA required')
logger.exception('engine {0} : CAPTCHA')
else:
result_container.add_unresponsive_engine(engine_name, 'unexpected crash', str(e))
result_container.add_unresponsive_engine(engine_name, 'unexpected crash')
# others errors
logger.exception('engine {0} : exception : {1}'.format(engine_name, e))
else:
if getattr(threading.current_thread(), '_timeout', False):
record_error(engine_name, 'Timeout')
# suspend or not the engine if there are HTTP errors
with threading.RLock():
@ -255,12 +289,17 @@ def search_one_offline_request_safe(engine_name, query, request_params, result_c
engine.stats['engine_time_count'] += 1
except ValueError as e:
record_exception(engine_name, e)
record_offline_engine_stats_on_error(engine, result_container, start_time)
logger.exception('engine {0} : invalid input : {1}'.format(engine_name, e))
except Exception as e:
record_exception(engine_name, e)
record_offline_engine_stats_on_error(engine, result_container, start_time)
result_container.add_unresponsive_engine(engine_name, 'unexpected crash', str(e))
logger.exception('engine {0} : exception : {1}'.format(engine_name, e))
else:
if getattr(threading.current_thread(), '_timeout', False):
record_error(engine_name, 'Timeout')
def search_one_request_safe(engine_name, query, request_params, result_container, start_time, timeout_limit):
@ -278,6 +317,7 @@ def search_multiple_requests(requests, result_container, start_time, timeout_lim
args=(engine_name, query, request_params, result_container, start_time, timeout_limit),
name=search_id,
)
th._timeout = False
th._engine_name = engine_name
th.start()
@ -286,6 +326,7 @@ def search_multiple_requests(requests, result_container, start_time, timeout_lim
remaining_time = max(0.0, timeout_limit - (time() - start_time))
th.join(remaining_time)
if th.is_alive():
th._timeout = True
result_container.add_unresponsive_engine(th._engine_name, 'timeout')
logger.warning('engine timeout: {0}'.format(th._engine_name))
@ -385,6 +426,9 @@ class Search:
request_params['category'] = engineref.category
request_params['pageno'] = self.search_query.pageno
with threading.RLock():
engine.stats['sent_search_count'] += 1
return request_params, engine.timeout
# do search-request

View File

@ -10,7 +10,7 @@ from html.parser import HTMLParser
from urllib.parse import urljoin, urlparse
from lxml import html
from lxml.etree import XPath, _ElementStringResult, _ElementUnicodeResult
from lxml.etree import ElementBase, XPath, XPathError, XPathSyntaxError, _ElementStringResult, _ElementUnicodeResult
from babel.core import get_global
@ -18,6 +18,7 @@ from searx import settings
from searx.data import USER_AGENTS
from searx.version import VERSION_STRING
from searx.languages import language_codes
from searx.exceptions import SearxXPathSyntaxException, SearxEngineXPathException
from searx import logger
@ -33,6 +34,13 @@ xpath_cache = dict()
lang_to_lc_cache = dict()
class NotSetClass:
pass
NOTSET = NotSetClass()
def searx_useragent():
"""Return the searx User Agent"""
return 'searx/{searx_version} {suffix}'.format(
@ -125,7 +133,7 @@ def html_to_text(html_str):
return s.get_text()
def extract_text(xpath_results):
def extract_text(xpath_results, allow_none=False):
"""Extract text from a lxml result
* if xpath_results is list, extract the text from each result and concat the list
@ -133,22 +141,27 @@ def extract_text(xpath_results):
( text_content() method from lxml )
* if xpath_results is a string element, then it's already done
"""
if type(xpath_results) == list:
if isinstance(xpath_results, list):
# it's list of result : concat everything using recursive call
result = ''
for e in xpath_results:
result = result + extract_text(e)
return result.strip()
elif type(xpath_results) in [_ElementStringResult, _ElementUnicodeResult]:
# it's a string
return ''.join(xpath_results)
else:
elif isinstance(xpath_results, ElementBase):
# it's a element
text = html.tostring(
xpath_results, encoding='unicode', method='text', with_tail=False
)
text = text.strip().replace('\n', ' ')
return ' '.join(text.split())
elif isinstance(xpath_results, (_ElementStringResult, _ElementUnicodeResult, str, Number, bool)):
return str(xpath_results)
elif xpath_results is None and allow_none:
return None
elif xpath_results is None and not allow_none:
raise ValueError('extract_text(None, allow_none=False)')
else:
raise ValueError('unsupported type')
def normalize_url(url, base_url):
@ -170,7 +183,7 @@ def normalize_url(url, base_url):
>>> normalize_url('', 'https://example.com')
'https://example.com/'
>>> normalize_url('/test', '/path')
raise Exception
raise ValueError
Raises:
* lxml.etree.ParserError
@ -194,7 +207,7 @@ def normalize_url(url, base_url):
# add a / at this end of the url if there is no path
if not parsed_url.netloc:
raise Exception('Cannot parse url')
raise ValueError('Cannot parse url')
if not parsed_url.path:
url += '/'
@ -224,17 +237,17 @@ def extract_url(xpath_results, base_url):
>>> f('', 'https://example.com')
raise lxml.etree.ParserError
>>> searx.utils.extract_url([], 'https://example.com')
raise Exception
raise ValueError
Raises:
* Exception
* ValueError
* lxml.etree.ParserError
Returns:
* str: normalized URL
"""
if xpath_results == []:
raise Exception('Empty url resultset')
raise ValueError('Empty url resultset')
url = extract_text(xpath_results)
return normalize_url(url, base_url)
@ -258,7 +271,6 @@ def dict_subset(d, properties):
def list_get(a_list, index, default=None):
"""Get element in list or default value
Examples:
>>> list_get(['A', 'B', 'C'], 0)
'A'
@ -310,7 +322,7 @@ def get_torrent_size(filesize, filesize_multiplier):
filesize = int(filesize * 1000 * 1000)
elif filesize_multiplier == 'KiB':
filesize = int(filesize * 1000)
except:
except ValueError:
filesize = None
return filesize
@ -506,20 +518,110 @@ def get_engine_from_settings(name):
return {}
def get_xpath(xpath_str):
def get_xpath(xpath_spec):
"""Return cached compiled XPath
There is no thread lock.
Worst case scenario, xpath_str is compiled more than one time.
Args:
* xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
Returns:
* result (bool, float, list, str): Results.
Raises:
* TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
* SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
"""
result = xpath_cache.get(xpath_str, None)
if isinstance(xpath_spec, str):
result = xpath_cache.get(xpath_spec, None)
if result is None:
result = XPath(xpath_str)
xpath_cache[xpath_str] = result
try:
result = XPath(xpath_spec)
except XPathSyntaxError as e:
raise SearxXPathSyntaxException(xpath_spec, str(e.msg))
xpath_cache[xpath_spec] = result
return result
if isinstance(xpath_spec, XPath):
return xpath_spec
raise TypeError('xpath_spec must be either a str or a lxml.etree.XPath')
def eval_xpath(element, xpath_spec):
"""Equivalent of element.xpath(xpath_str) but compile xpath_str once for all.
See https://lxml.de/xpathxslt.html#xpath-return-values
Args:
* element (ElementBase): [description]
* xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
Returns:
* result (bool, float, list, str): Results.
Raises:
* TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
* SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
* SearxEngineXPathException: Raise when the XPath can't be evaluated.
"""
xpath = get_xpath(xpath_spec)
try:
return xpath(element)
except XPathError as e:
arg = ' '.join([str(i) for i in e.args])
raise SearxEngineXPathException(xpath_spec, arg)
def eval_xpath_list(element, xpath_spec, min_len=None):
"""Same as eval_xpath, check if the result is a list
Args:
* element (ElementBase): [description]
* xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
* min_len (int, optional): [description]. Defaults to None.
Raises:
* TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
* SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
* SearxEngineXPathException: raise if the result is not a list
Returns:
* result (bool, float, list, str): Results.
"""
result = eval_xpath(element, xpath_spec)
if not isinstance(result, list):
raise SearxEngineXPathException(xpath_spec, 'the result is not a list')
if min_len is not None and min_len > len(result):
raise SearxEngineXPathException(xpath_spec, 'len(xpath_str) < ' + str(min_len))
return result
def eval_xpath(element, xpath_str):
"""Equivalent of element.xpath(xpath_str) but compile xpath_str once for all."""
xpath = get_xpath(xpath_str)
return xpath(element)
def eval_xpath_getindex(elements, xpath_spec, index, default=NOTSET):
"""Call eval_xpath_list then get one element using the index parameter.
If the index does not exist, either aise an exception is default is not set,
other return the default value (can be None).
Args:
* elements (ElementBase): lxml element to apply the xpath.
* xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath.
* index (int): index to get
* default (Object, optional): Defaults if index doesn't exist.
Raises:
* TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
* SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
* SearxEngineXPathException: if the index is not found. Also see eval_xpath.
Returns:
* result (bool, float, list, str): Results.
"""
result = eval_xpath_list(elements, xpath_spec)
if index >= -len(result) and index < len(result):
return result[index]
if default == NOTSET:
# raise an SearxEngineXPathException instead of IndexError
# to record xpath_spec
raise SearxEngineXPathException(xpath_spec, 'index ' + str(index) + ' not found')
return default

View File

@ -79,6 +79,7 @@ from searx.plugins.oa_doi_rewrite import get_doi_resolver
from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES
from searx.answerers import answerers
from searx.poolrequests import get_global_proxies
from searx.metrology.error_recorder import errors_per_engines
# serve pages with HTTP/1.1
@ -943,6 +944,34 @@ def stats():
)
@app.route('/stats/errors', methods=['GET'])
def stats_errors():
result = {}
engine_names = list(errors_per_engines.keys())
engine_names.sort()
for engine_name in engine_names:
error_stats = errors_per_engines[engine_name]
sent_search_count = max(engines[engine_name].stats['sent_search_count'], 1)
sorted_context_count_list = sorted(error_stats.items(), key=lambda context_count: context_count[1])
r = []
percentage_sum = 0
for context, count in sorted_context_count_list:
percentage = round(20 * count / sent_search_count) * 5
percentage_sum += percentage
r.append({
'filename': context.filename,
'function': context.function,
'line_no': context.line_no,
'code': context.code,
'exception_classname': context.exception_classname,
'log_message': context.log_message,
'log_parameters': context.log_parameters,
'percentage': percentage,
})
result[engine_name] = sorted(r, reverse=True, key=lambda d: d['percentage'])
return jsonify(result)
@app.route('/robots.txt', methods=['GET'])
def robots():
return Response("""User-agent: *

View File

@ -3,6 +3,7 @@ import lxml.etree
from lxml import html
from searx.testing import SearxTestCase
from searx.exceptions import SearxXPathSyntaxException, SearxEngineXPathException
from searx import utils
@ -57,8 +58,16 @@ class TestUtils(SearxTestCase):
dom = html.fromstring(html_str)
self.assertEqual(utils.extract_text(dom), 'Test text')
self.assertEqual(utils.extract_text(dom.xpath('//span')), 'Test text')
self.assertEqual(utils.extract_text(dom.xpath('//span/text()')), 'Test text')
self.assertEqual(utils.extract_text(dom.xpath('count(//span)')), '3.0')
self.assertEqual(utils.extract_text(dom.xpath('boolean(//span)')), 'True')
self.assertEqual(utils.extract_text(dom.xpath('//img/@src')), 'test.jpg')
self.assertEqual(utils.extract_text(dom.xpath('//unexistingtag')), '')
self.assertEqual(utils.extract_text(None, allow_none=True), None)
with self.assertRaises(ValueError):
utils.extract_text(None)
with self.assertRaises(ValueError):
utils.extract_text({})
def test_extract_url(self):
def f(html_str, search_url):
@ -136,3 +145,84 @@ class TestHTMLTextExtractor(SearxTestCase):
text = '<p><b>Lorem ipsum</i>dolor sit amet</p>'
with self.assertRaises(utils.HTMLTextExtractorException):
self.html_text_extractor.feed(text)
class TestXPathUtils(SearxTestCase):
TEST_DOC = """<ul>
<li>Text in <b>bold</b> and <i>italic</i> </li>
<li>Another <b>text</b> <img src=""></li>
</ul>"""
def test_get_xpath_cache(self):
xp1 = utils.get_xpath('//a')
xp2 = utils.get_xpath('//div')
xp3 = utils.get_xpath('//a')
self.assertEqual(id(xp1), id(xp3))
self.assertNotEqual(id(xp1), id(xp2))
def test_get_xpath_type(self):
utils.get_xpath(lxml.etree.XPath('//a'))
with self.assertRaises(TypeError):
utils.get_xpath([])
def test_get_xpath_invalid(self):
invalid_xpath = '//a[0].text'
with self.assertRaises(SearxXPathSyntaxException) as context:
utils.get_xpath(invalid_xpath)
self.assertEqual(context.exception.message, 'Invalid expression')
self.assertEqual(context.exception.xpath_str, invalid_xpath)
def test_eval_xpath_unregistered_function(self):
doc = html.fromstring(TestXPathUtils.TEST_DOC)
invalid_function_xpath = 'int(//a)'
with self.assertRaises(SearxEngineXPathException) as context:
utils.eval_xpath(doc, invalid_function_xpath)
self.assertEqual(context.exception.message, 'Unregistered function')
self.assertEqual(context.exception.xpath_str, invalid_function_xpath)
def test_eval_xpath(self):
doc = html.fromstring(TestXPathUtils.TEST_DOC)
self.assertEqual(utils.eval_xpath(doc, '//p'), [])
self.assertEqual(utils.eval_xpath(doc, '//i/text()'), ['italic'])
self.assertEqual(utils.eval_xpath(doc, 'count(//i)'), 1.0)
def test_eval_xpath_list(self):
doc = html.fromstring(TestXPathUtils.TEST_DOC)
# check a not empty list
self.assertEqual(utils.eval_xpath_list(doc, '//i/text()'), ['italic'])
# check min_len parameter
with self.assertRaises(SearxEngineXPathException) as context:
utils.eval_xpath_list(doc, '//p', min_len=1)
self.assertEqual(context.exception.message, 'len(xpath_str) < 1')
self.assertEqual(context.exception.xpath_str, '//p')
def test_eval_xpath_getindex(self):
doc = html.fromstring(TestXPathUtils.TEST_DOC)
# check index 0
self.assertEqual(utils.eval_xpath_getindex(doc, '//i/text()', 0), 'italic')
# default is 'something'
self.assertEqual(utils.eval_xpath_getindex(doc, '//i/text()', 1, default='something'), 'something')
# default is None
self.assertEqual(utils.eval_xpath_getindex(doc, '//i/text()', 1, default=None), None)
# index not found
with self.assertRaises(SearxEngineXPathException) as context:
utils.eval_xpath_getindex(doc, '//i/text()', 1)
self.assertEqual(context.exception.message, 'index 1 not found')
# not a list
with self.assertRaises(SearxEngineXPathException) as context:
utils.eval_xpath_getindex(doc, 'count(//i)', 1)
self.assertEqual(context.exception.message, 'the result is not a list')