Merge pull request #733 from dalf/searchpy

Search architecture
This commit is contained in:
Adam Tauber 2016-11-04 19:38:21 +01:00 committed by GitHub
commit 51eafdd471
9 changed files with 280 additions and 233 deletions

View File

@ -20,12 +20,12 @@ def extract_doi(url):
return None return None
def on_result(request, ctx): def on_result(request, search, result):
doi = extract_doi(ctx['result']['parsed_url']) doi = extract_doi(result['parsed_url'])
if doi and len(doi) < 50: if doi and len(doi) < 50:
for suffix in ('/', '.pdf', '/full', '/meta', '/abstract'): for suffix in ('/', '.pdf', '/full', '/meta', '/abstract'):
if doi.endswith(suffix): if doi.endswith(suffix):
doi = doi[:-len(suffix)] doi = doi[:-len(suffix)]
ctx['result']['url'] = 'http://doai.io/' + doi result['url'] = 'http://doai.io/' + doi
ctx['result']['parsed_url'] = urlparse(ctx['result']['url']) result['parsed_url'] = urlparse(ctx['result']['url'])
return True return True

View File

@ -220,8 +220,7 @@ def https_url_rewrite(result):
return result return result
def on_result(request, ctx): def on_result(request, search, result):
result = ctx['result']
if result['parsed_url'].scheme == 'http': if result['parsed_url'].scheme == 'http':
https_url_rewrite(result) https_url_rewrite(result)
return True return True

View File

@ -28,19 +28,19 @@ p = re.compile('.*user[ -]agent.*', re.IGNORECASE)
# attach callback to the post search hook # attach callback to the post search hook
# request: flask request object # request: flask request object
# ctx: the whole local context of the pre search hook # ctx: the whole local context of the pre search hook
def post_search(request, ctx): def post_search(request, search):
if ctx['search'].pageno > 1: if search.search_query.pageno > 1:
return True return True
if ctx['search'].query == 'ip': if search.search_query.query == 'ip':
x_forwarded_for = request.headers.getlist("X-Forwarded-For") x_forwarded_for = request.headers.getlist("X-Forwarded-For")
if x_forwarded_for: if x_forwarded_for:
ip = x_forwarded_for[0] ip = x_forwarded_for[0]
else: else:
ip = request.remote_addr ip = request.remote_addr
ctx['search'].result_container.answers.clear() search.result_container.answers.clear()
ctx['search'].result_container.answers.add(ip) search.result_container.answers.add(ip)
elif p.match(ctx['search'].query): elif p.match(search.search_query.query):
ua = request.user_agent ua = request.user_agent
ctx['search'].result_container.answers.clear() search.result_container.answers.clear()
ctx['search'].result_container.answers.add(ua) search.result_container.answers.add(ua)
return True return True

View File

@ -28,8 +28,8 @@ description = gettext('Remove trackers arguments from the returned URL')
default_on = True default_on = True
def on_result(request, ctx): def on_result(request, search, result):
query = ctx['result']['parsed_url'].query query = result['parsed_url'].query
if query == "": if query == "":
return True return True
@ -37,8 +37,8 @@ def on_result(request, ctx):
for reg in regexes: for reg in regexes:
query = reg.sub('', query) query = reg.sub('', query)
if query != ctx['result']['parsed_url'].query: if query != result['parsed_url'].query:
ctx['result']['parsed_url'] = ctx['result']['parsed_url']._replace(query=query) result['parsed_url'] = result['parsed_url']._replace(query=query)
ctx['result']['url'] = urlunparse(ctx['result']['parsed_url']) result['url'] = urlunparse(result['parsed_url'])
return True return True

View File

@ -25,8 +25,8 @@ import string
import re import re
class Query(object): class RawTextQuery(object):
"""parse query""" """parse raw text query (the value from the html input)"""
def __init__(self, query, disabled_engines): def __init__(self, query, disabled_engines):
self.query = query self.query = query
@ -130,3 +130,19 @@ class Query(object):
def getFullQuery(self): def getFullQuery(self):
# get full querry including whitespaces # get full querry including whitespaces
return string.join(self.query_parts, '') return string.join(self.query_parts, '')
class SearchQuery(object):
"""container for all the search parameters (query, language, etc...)"""
def __init__(self, query, engines, categories, lang, safesearch, pageno, time_range):
self.query = query
self.engines = engines
self.categories = categories
self.lang = lang
self.safesearch = safesearch
self.pageno = pageno
self.time_range = time_range
def __str__(self):
return str(self.query) + ";" + str(self.engines)

View File

@ -128,6 +128,8 @@ class ResultContainer(object):
self.suggestions = set() self.suggestions = set()
self.answers = set() self.answers = set()
self._number_of_results = [] self._number_of_results = []
self._ordered = False
self.paging = False
def extend(self, engine_name, results): def extend(self, engine_name, results):
for result in list(results): for result in list(results):
@ -153,6 +155,9 @@ class ResultContainer(object):
self.results[engine_name].extend(results) self.results[engine_name].extend(results)
if not self.paging and engines[engine_name].paging:
self.paging = True
for i, result in enumerate(results): for i, result in enumerate(results):
try: try:
result['url'] = result['url'].decode('utf-8') result['url'] = result['url'].decode('utf-8')
@ -219,7 +224,7 @@ class ResultContainer(object):
with RLock(): with RLock():
self._merged_results.append(result) self._merged_results.append(result)
def get_ordered_results(self): def order_results(self):
for result in self._merged_results: for result in self._merged_results:
score = result_score(result) score = result_score(result)
result['score'] = score result['score'] = score
@ -269,8 +274,14 @@ class ResultContainer(object):
# update categoryIndex # update categoryIndex
categoryPositions[category] = {'index': len(gresults), 'count': 8} categoryPositions[category] = {'index': len(gresults), 'count': 8}
# return gresults # update _merged_results
return gresults self._ordered = True
self._merged_results = gresults
def get_ordered_results(self):
if not self._ordered:
self.order_results()
return self._merged_results
def results_length(self): def results_length(self):
return len(self._merged_results) return len(self._merged_results)

View File

@ -25,9 +25,10 @@ from searx.engines import (
categories, engines categories, engines
) )
from searx.utils import gen_useragent from searx.utils import gen_useragent
from searx.query import Query from searx.query import RawTextQuery, SearchQuery
from searx.results import ResultContainer from searx.results import ResultContainer
from searx import logger from searx import logger
from searx.plugins import plugins
logger = logger.getChild('search') logger = logger.getChild('search')
@ -127,135 +128,130 @@ def make_callback(engine_name, callback, params, result_container):
return process_callback return process_callback
def get_search_query_from_webapp(preferences, form):
query = None
query_engines = []
query_categories = []
query_paging = False
query_pageno = 1
query_lang = 'all'
query_time_range = None
# set blocked engines
disabled_engines = preferences.engines.get_disabled()
# set specific language if set
query_lang = preferences.get_value('language')
# safesearch
query_safesearch = preferences.get_value('safesearch')
# TODO better exceptions
if not form.get('q'):
raise Exception('noquery')
# set pagenumber
pageno_param = form.get('pageno', '1')
if not pageno_param.isdigit() or int(pageno_param) < 1:
pageno_param = 1
query_pageno = int(pageno_param)
# parse query, if tags are set, which change
# the serch engine or search-language
raw_text_query = RawTextQuery(form['q'], disabled_engines)
raw_text_query.parse_query()
# set query
query = raw_text_query.getSearchQuery()
# get last selected language in query, if possible
# TODO support search with multible languages
if len(raw_text_query.languages):
query_lang = raw_text_query.languages[-1]
query_time_range = form.get('time_range')
query_engines = raw_text_query.engines
# if engines are calculated from query,
# set categories by using that informations
if query_engines and raw_text_query.specific:
query_categories = list(set(engine['category']
for engine in query_engines))
# otherwise, using defined categories to
# calculate which engines should be used
else:
# set categories/engines
load_default_categories = True
for pd_name, pd in form.items():
if pd_name == 'categories':
query_categories.extend(categ for categ in map(unicode.strip, pd.split(',')) if categ in categories)
elif pd_name == 'engines':
pd_engines = [{'category': engines[engine].categories[0],
'name': engine}
for engine in map(unicode.strip, pd.split(',')) if engine in engines]
if pd_engines:
query_engines.extend(pd_engines)
load_default_categories = False
elif pd_name.startswith('category_'):
category = pd_name[9:]
# if category is not found in list, skip
if category not in categories:
continue
if pd != 'off':
# add category to list
query_categories.append(category)
elif category in query_categories:
# remove category from list if property is set to 'off'
query_categories.remove(category)
if not load_default_categories:
if not query_categories:
query_categories = list(set(engine['category']
for engine in engines))
else:
# if no category is specified for this search,
# using user-defined default-configuration which
# (is stored in cookie)
if not query_categories:
cookie_categories = preferences.get_value('categories')
for ccateg in cookie_categories:
if ccateg in categories:
query_categories.append(ccateg)
# if still no category is specified, using general
# as default-category
if not query_categories:
query_categories = ['general']
# using all engines for that search, which are
# declared under the specific categories
for categ in query_categories:
query_engines.extend({'category': categ,
'name': engine.name}
for engine in categories[categ]
if (engine.name, categ) not in disabled_engines)
return SearchQuery(query, query_engines, query_categories,
query_lang, query_safesearch, query_pageno, query_time_range)
class Search(object): class Search(object):
"""Search information container""" """Search information container"""
def __init__(self, request): def __init__(self, search_query):
# init vars # init vars
super(Search, self).__init__() super(Search, self).__init__()
self.query = None self.search_query = search_query
self.engines = []
self.categories = []
self.paging = False
self.pageno = 1
self.lang = 'all'
self.time_range = None
self.is_advanced = None
# set blocked engines
self.disabled_engines = request.preferences.engines.get_disabled()
self.result_container = ResultContainer() self.result_container = ResultContainer()
self.request_data = {}
# set specific language if set
self.lang = request.preferences.get_value('language')
# set request method
if request.method == 'POST':
self.request_data = request.form
else:
self.request_data = request.args
# TODO better exceptions
if not self.request_data.get('q'):
raise Exception('noquery')
# set pagenumber
pageno_param = self.request_data.get('pageno', '1')
if not pageno_param.isdigit() or int(pageno_param) < 1:
pageno_param = 1
self.pageno = int(pageno_param)
# parse query, if tags are set, which change
# the serch engine or search-language
query_obj = Query(self.request_data['q'], self.disabled_engines)
query_obj.parse_query()
# set query
self.query = query_obj.getSearchQuery()
# get last selected language in query, if possible
# TODO support search with multible languages
if len(query_obj.languages):
self.lang = query_obj.languages[-1]
self.time_range = self.request_data.get('time_range')
self.is_advanced = self.request_data.get('advanced_search')
self.engines = query_obj.engines
# if engines are calculated from query,
# set categories by using that informations
if self.engines and query_obj.specific:
self.categories = list(set(engine['category']
for engine in self.engines))
# otherwise, using defined categories to
# calculate which engines should be used
else:
# set categories/engines
load_default_categories = True
for pd_name, pd in self.request_data.items():
if pd_name == 'categories':
self.categories.extend(categ for categ in map(unicode.strip, pd.split(',')) if categ in categories)
elif pd_name == 'engines':
pd_engines = [{'category': engines[engine].categories[0],
'name': engine}
for engine in map(unicode.strip, pd.split(',')) if engine in engines]
if pd_engines:
self.engines.extend(pd_engines)
load_default_categories = False
elif pd_name.startswith('category_'):
category = pd_name[9:]
# if category is not found in list, skip
if category not in categories:
continue
if pd != 'off':
# add category to list
self.categories.append(category)
elif category in self.categories:
# remove category from list if property is set to 'off'
self.categories.remove(category)
if not load_default_categories:
if not self.categories:
self.categories = list(set(engine['category']
for engine in self.engines))
return
# if no category is specified for this search,
# using user-defined default-configuration which
# (is stored in cookie)
if not self.categories:
cookie_categories = request.preferences.get_value('categories')
for ccateg in cookie_categories:
if ccateg in categories:
self.categories.append(ccateg)
# if still no category is specified, using general
# as default-category
if not self.categories:
self.categories = ['general']
# using all engines for that search, which are
# declared under the specific categories
for categ in self.categories:
self.engines.extend({'category': categ,
'name': engine.name}
for engine in categories[categ]
if (engine.name, categ) not in self.disabled_engines)
# remove suspended engines
self.engines = [e for e in self.engines
if engines[e['name']].suspend_end_time <= time()]
# do search-request # do search-request
def search(self, request): def search(self):
global number_of_searches global number_of_searches
# init vars # init vars
@ -268,23 +264,30 @@ class Search(object):
# user_agent = request.headers.get('User-Agent', '') # user_agent = request.headers.get('User-Agent', '')
user_agent = gen_useragent() user_agent = gen_useragent()
search_query = self.search_query
# start search-reqest for all selected engines # start search-reqest for all selected engines
for selected_engine in self.engines: for selected_engine in search_query.engines:
if selected_engine['name'] not in engines: if selected_engine['name'] not in engines:
continue continue
engine = engines[selected_engine['name']] engine = engines[selected_engine['name']]
# skip suspended engines
if engine.suspend_end_time and engine.suspend_end_time <= time():
continue
# if paging is not supported, skip # if paging is not supported, skip
if self.pageno > 1 and not engine.paging: if search_query.pageno > 1 and not engine.paging:
continue continue
# if search-language is set and engine does not # if search-language is set and engine does not
# provide language-support, skip # provide language-support, skip
if self.lang != 'all' and not engine.language_support: if search_query.lang != 'all' and not engine.language_support:
continue continue
if self.time_range and not engine.time_range_support: # if time_range is not supported, skip
if search_query.time_range and not engine.time_range_support:
continue continue
# set default request parameters # set default request parameters
@ -292,21 +295,20 @@ class Search(object):
request_params['headers']['User-Agent'] = user_agent request_params['headers']['User-Agent'] = user_agent
request_params['category'] = selected_engine['category'] request_params['category'] = selected_engine['category']
request_params['started'] = time() request_params['started'] = time()
request_params['pageno'] = self.pageno request_params['pageno'] = search_query.pageno
if hasattr(engine, 'language') and engine.language: if hasattr(engine, 'language') and engine.language:
request_params['language'] = engine.language request_params['language'] = engine.language
else: else:
request_params['language'] = self.lang request_params['language'] = search_query.lang
# 0 = None, 1 = Moderate, 2 = Strict # 0 = None, 1 = Moderate, 2 = Strict
request_params['safesearch'] = request.preferences.get_value('safesearch') request_params['safesearch'] = search_query.safesearch
request_params['time_range'] = self.time_range request_params['time_range'] = search_query.time_range
request_params['advanced_search'] = self.is_advanced
# update request parameters dependent on # update request parameters dependent on
# search-engine (contained in engines folder) # search-engine (contained in engines folder)
engine.request(self.query.encode('utf-8'), request_params) engine.request(search_query.query.encode('utf-8'), request_params)
if request_params['url'] is None: if request_params['url'] is None:
# TODO add support of offline engines # TODO add support of offline engines
@ -346,10 +348,32 @@ class Search(object):
selected_engine['name'])) selected_engine['name']))
if not requests: if not requests:
return self return self.result_container
# send all search-request # send all search-request
threaded_requests(requests) threaded_requests(requests)
start_new_thread(gc.collect, tuple()) start_new_thread(gc.collect, tuple())
# return results, suggestions, answers and infoboxes # return results, suggestions, answers and infoboxes
return self return self.result_container
class SearchWithPlugins(Search):
"""Similar to the Search class but call the plugins."""
def __init__(self, search_query, request):
super(SearchWithPlugins, self).__init__(search_query)
self.request = request
def search(self):
if plugins.call('pre_search', self.request, self):
super(SearchWithPlugins, self).search()
plugins.call('post_search', self.request, self)
results = self.result_container.get_ordered_results()
for result in results:
plugins.call('on_result', self.request, self, result)
return self.result_container

View File

@ -62,8 +62,8 @@ from searx.utils import (
) )
from searx.version import VERSION_STRING from searx.version import VERSION_STRING
from searx.languages import language_codes from searx.languages import language_codes
from searx.search import Search from searx.search import Search, SearchWithPlugins, get_search_query_from_webapp
from searx.query import Query from searx.query import RawTextQuery, SearchQuery
from searx.autocomplete import searx_bang, backends as autocomplete_backends from searx.autocomplete import searx_bang, backends as autocomplete_backends
from searx.plugins import plugins from searx.plugins import plugins
from searx.preferences import Preferences, ValidationException from searx.preferences import Preferences, ValidationException
@ -373,11 +373,13 @@ def pre_request():
logger.warning('Invalid config') logger.warning('Invalid config')
request.preferences = preferences request.preferences = preferences
# request.form
request.form = dict(request.form.items()) request.form = dict(request.form.items())
for k, v in request.args.items(): for k, v in request.args.items():
if k not in request.form: if k not in request.form:
request.form[k] = v request.form[k] = v
# request.user_plugins
request.user_plugins = [] request.user_plugins = []
allowed_plugins = preferences.plugins.get_enabled() allowed_plugins = preferences.plugins.get_enabled()
disabled_plugins = preferences.plugins.get_disabled() disabled_plugins = preferences.plugins.get_disabled()
@ -400,30 +402,33 @@ def index():
'index.html', 'index.html',
) )
# search
search_query = None
result_container = None
try: try:
search = Search(request) search_query = get_search_query_from_webapp(request.preferences, request.form)
# search = Search(search_query) # without plugins
search = SearchWithPlugins(search_query, request)
result_container = search.search()
except: except:
return render( return render(
'index.html', 'index.html',
) )
if plugins.call('pre_search', request, locals()): results = result_container.get_ordered_results()
search.search(request)
plugins.call('post_search', request, locals()) # UI
advanced_search = request.form.get('advanced_search', None)
results = search.result_container.get_ordered_results() output_format = request.form.get('format', 'html')
if output_format not in ['html', 'csv', 'json', 'rss']:
output_format = 'html'
# output
for result in results: for result in results:
if output_format == 'html':
plugins.call('on_result', request, locals())
if not search.paging and engines[result['engine']].paging:
search.paging = True
if search.request_data.get('format', 'html') == 'html':
if 'content' in result and result['content']: if 'content' in result and result['content']:
result['content'] = highlight_content(result['content'][:1024], search.query.encode('utf-8')) result['content'] = highlight_content(result['content'][:1024], search_query.query.encode('utf-8'))
result['title'] = highlight_content(result['title'], search.query.encode('utf-8')) result['title'] = highlight_content(result['title'], search_query.query.encode('utf-8'))
else: else:
if result.get('content'): if result.get('content'):
result['content'] = html_to_text(result['content']).strip() result['content'] = html_to_text(result['content']).strip()
@ -450,16 +455,16 @@ def index():
else: else:
result['publishedDate'] = format_date(result['publishedDate']) result['publishedDate'] = format_date(result['publishedDate'])
number_of_results = search.result_container.results_number() number_of_results = result_container.results_number()
if number_of_results < search.result_container.results_length(): if number_of_results < result_container.results_length():
number_of_results = 0 number_of_results = 0
if search.request_data.get('format') == 'json': if output_format == 'json':
return Response(json.dumps({'query': search.query, return Response(json.dumps({'query': search_query.query,
'number_of_results': number_of_results, 'number_of_results': number_of_results,
'results': results}), 'results': results}),
mimetype='application/json') mimetype='application/json')
elif search.request_data.get('format') == 'csv': elif output_format == 'csv':
csv = UnicodeWriter(cStringIO.StringIO()) csv = UnicodeWriter(cStringIO.StringIO())
keys = ('title', 'url', 'content', 'host', 'engine', 'score') keys = ('title', 'url', 'content', 'host', 'engine', 'score')
csv.writerow(keys) csv.writerow(keys)
@ -468,14 +473,14 @@ def index():
csv.writerow([row.get(key, '') for key in keys]) csv.writerow([row.get(key, '') for key in keys])
csv.stream.seek(0) csv.stream.seek(0)
response = Response(csv.stream.read(), mimetype='application/csv') response = Response(csv.stream.read(), mimetype='application/csv')
cont_disp = 'attachment;Filename=searx_-_{0}.csv'.format(search.query.encode('utf-8')) cont_disp = 'attachment;Filename=searx_-_{0}.csv'.format(search_query.query.encode('utf-8'))
response.headers.add('Content-Disposition', cont_disp) response.headers.add('Content-Disposition', cont_disp)
return response return response
elif search.request_data.get('format') == 'rss': elif output_format == 'rss':
response_rss = render( response_rss = render(
'opensearch_response_rss.xml', 'opensearch_response_rss.xml',
results=results, results=results,
q=search.request_data['q'], q=request.form['q'],
number_of_results=number_of_results, number_of_results=number_of_results,
base_url=get_base_url() base_url=get_base_url()
) )
@ -484,17 +489,17 @@ def index():
return render( return render(
'results.html', 'results.html',
results=results, results=results,
q=search.request_data['q'], q=request.form['q'],
selected_categories=search.categories, selected_categories=search_query.categories,
paging=search.paging, pageno=search_query.pageno,
time_range=search_query.time_range,
number_of_results=format_decimal(number_of_results), number_of_results=format_decimal(number_of_results),
pageno=search.pageno, advanced_search=advanced_search,
advanced_search=search.is_advanced, suggestions=result_container.suggestions,
time_range=search.time_range, answers=result_container.answers,
infoboxes=result_container.infoboxes,
paging=result_container.paging,
base_url=get_base_url(), base_url=get_base_url(),
suggestions=search.result_container.suggestions,
answers=search.result_container.answers,
infoboxes=search.result_container.infoboxes,
theme=get_current_theme_name(), theme=get_current_theme_name(),
favicons=global_favicons[themes.index(get_current_theme_name())] favicons=global_favicons[themes.index(get_current_theme_name())]
) )
@ -511,30 +516,23 @@ def about():
@app.route('/autocompleter', methods=['GET', 'POST']) @app.route('/autocompleter', methods=['GET', 'POST'])
def autocompleter(): def autocompleter():
"""Return autocompleter results""" """Return autocompleter results"""
request_data = {}
# select request method
if request.method == 'POST':
request_data = request.form
else:
request_data = request.args
# set blocked engines # set blocked engines
disabled_engines = request.preferences.engines.get_disabled() disabled_engines = request.preferences.engines.get_disabled()
# parse query # parse query
query = Query(request_data.get('q', '').encode('utf-8'), disabled_engines) raw_text_query = RawTextQuery(request.form.get('q', '').encode('utf-8'), disabled_engines)
query.parse_query() raw_text_query.parse_query()
# check if search query is set # check if search query is set
if not query.getSearchQuery(): if not raw_text_query.getSearchQuery():
return '', 400 return '', 400
# run autocompleter # run autocompleter
completer = autocomplete_backends.get(request.preferences.get_value('autocomplete')) completer = autocomplete_backends.get(request.preferences.get_value('autocomplete'))
# parse searx specific autocompleter results like !bang # parse searx specific autocompleter results like !bang
raw_results = searx_bang(query) raw_results = searx_bang(raw_text_query)
# normal autocompletion results only appear if max 3 inner results returned # normal autocompletion results only appear if max 3 inner results returned
if len(raw_results) <= 3 and completer: if len(raw_results) <= 3 and completer:
@ -545,19 +543,19 @@ def autocompleter():
else: else:
language = language.split('_')[0] language = language.split('_')[0]
# run autocompletion # run autocompletion
raw_results.extend(completer(query.getSearchQuery(), language)) raw_results.extend(completer(raw_text_query.getSearchQuery(), language))
# parse results (write :language and !engine back to result string) # parse results (write :language and !engine back to result string)
results = [] results = []
for result in raw_results: for result in raw_results:
query.changeSearchQuery(result) raw_text_query.changeSearchQuery(result)
# add parsed result # add parsed result
results.append(query.getFullQuery()) results.append(raw_text_query.getFullQuery())
# return autocompleter results # return autocompleter results
if request_data.get('format') == 'x-suggestions': if request.form.get('format') == 'x-suggestions':
return Response(json.dumps([query.query, results]), return Response(json.dumps([raw_text_query.query, results]),
mimetype='application/json') mimetype='application/json')
return Response(json.dumps(results), return Response(json.dumps(results),

View File

@ -6,9 +6,8 @@ from mock import Mock
def get_search_mock(query, **kwargs): def get_search_mock(query, **kwargs):
return {'search': Mock(query=query, return Mock(search_query=Mock(query=query, **kwargs),
result_container=Mock(answers=set()), result_container=Mock(answers=set()))
**kwargs)}
class PluginStoreTest(SearxTestCase): class PluginStoreTest(SearxTestCase):
@ -52,39 +51,39 @@ class SelfIPTest(SearxTestCase):
request = Mock(user_plugins=store.plugins, request = Mock(user_plugins=store.plugins,
remote_addr='127.0.0.1') remote_addr='127.0.0.1')
request.headers.getlist.return_value = [] request.headers.getlist.return_value = []
ctx = get_search_mock(query='ip', pageno=1) search = get_search_mock(query='ip', pageno=1)
store.call('post_search', request, ctx) store.call('post_search', request, search)
self.assertTrue('127.0.0.1' in ctx['search'].result_container.answers) self.assertTrue('127.0.0.1' in search.result_container.answers)
ctx = get_search_mock(query='ip', pageno=2) search = get_search_mock(query='ip', pageno=2)
store.call('post_search', request, ctx) store.call('post_search', request, search)
self.assertFalse('127.0.0.1' in ctx['search'].result_container.answers) self.assertFalse('127.0.0.1' in search.result_container.answers)
# User agent test # User agent test
request = Mock(user_plugins=store.plugins, request = Mock(user_plugins=store.plugins,
user_agent='Mock') user_agent='Mock')
request.headers.getlist.return_value = [] request.headers.getlist.return_value = []
ctx = get_search_mock(query='user-agent', pageno=1) search = get_search_mock(query='user-agent', pageno=1)
store.call('post_search', request, ctx) store.call('post_search', request, search)
self.assertTrue('Mock' in ctx['search'].result_container.answers) self.assertTrue('Mock' in search.result_container.answers)
ctx = get_search_mock(query='user-agent', pageno=2) search = get_search_mock(query='user-agent', pageno=2)
store.call('post_search', request, ctx) store.call('post_search', request, search)
self.assertFalse('Mock' in ctx['search'].result_container.answers) self.assertFalse('Mock' in search.result_container.answers)
ctx = get_search_mock(query='user-agent', pageno=1) search = get_search_mock(query='user-agent', pageno=1)
store.call('post_search', request, ctx) store.call('post_search', request, search)
self.assertTrue('Mock' in ctx['search'].result_container.answers) self.assertTrue('Mock' in search.result_container.answers)
ctx = get_search_mock(query='user-agent', pageno=2) search = get_search_mock(query='user-agent', pageno=2)
store.call('post_search', request, ctx) store.call('post_search', request, search)
self.assertFalse('Mock' in ctx['search'].result_container.answers) self.assertFalse('Mock' in search.result_container.answers)
ctx = get_search_mock(query='What is my User-Agent?', pageno=1) search = get_search_mock(query='What is my User-Agent?', pageno=1)
store.call('post_search', request, ctx) store.call('post_search', request, search)
self.assertTrue('Mock' in ctx['search'].result_container.answers) self.assertTrue('Mock' in search.result_container.answers)
ctx = get_search_mock(query='What is my User-Agent?', pageno=2) search = get_search_mock(query='What is my User-Agent?', pageno=2)
store.call('post_search', request, ctx) store.call('post_search', request, search)
self.assertFalse('Mock' in ctx['search'].result_container.answers) self.assertFalse('Mock' in search.result_container.answers)