Clean up the architecture

Purposes :
- isolate the plugins calls
- distinction between parsing the web request and running the search (Search class). To be able to test code easily, to run searx code outside a web server, to filter the search query parameters with plugins more easily, etc...

Details :
- request.request_data contains request.form or request.args (initialize inside pre_request() function)
- Query class is renamed RawTextQuery
- SearchQuery class defines all search parameters
- get_search_query_from_webapp create a SearchQuery instance (basically the previous Search.__init__ code)
- Search class and SearchWithPlugins class takes a SearchQuery instance as class constructor parameter
- SearchWithPlugins class inherites from Search class, and run plugins
- A dedicated function search_with_plugins executes plugins to have a well define locals() (which is used by the plugins code).
- All plugins code is executed inside the try...except block (webapp.py, index function)
- advanced_search HTTP parameter value stays in webapp.py (it is only part of UI)
- multiple calls to result_container.get_ordered_results() doesn't compute the order multiple time (note : this method was call only once before)
- paging value is stored in the result_container class (compute in the extend method)
- test about engine.suspend_end_time is done during search method call (instead of __init__)
- check that the format parameter value is one of these : html, rss, json, rss (before the html value was assumed but some text formatting wasn't not done)
This commit is contained in:
dalf 2016-10-22 13:10:31 +02:00 committed by Alexandre Flament
parent 142cd87095
commit 67e11c42b9
6 changed files with 272 additions and 202 deletions

View File

@ -37,10 +37,10 @@ def post_search(request, ctx):
ip = x_forwarded_for[0] ip = x_forwarded_for[0]
else: else:
ip = request.remote_addr ip = request.remote_addr
ctx['search'].result_container.answers.clear() ctx['result_container'].answers.clear()
ctx['search'].result_container.answers.add(ip) ctx['result_container'].answers.add(ip)
elif p.match(ctx['search'].query): elif p.match(ctx['search'].query):
ua = request.user_agent ua = request.user_agent
ctx['search'].result_container.answers.clear() ctx['result_container'].answers.clear()
ctx['search'].result_container.answers.add(ua) ctx['result_container'].answers.add(ua)
return True return True

View File

@ -25,8 +25,8 @@ import string
import re import re
class Query(object): class RawTextQuery(object):
"""parse query""" """parse raw text query (the value from the html input)"""
def __init__(self, query, disabled_engines): def __init__(self, query, disabled_engines):
self.query = query self.query = query
@ -130,3 +130,19 @@ class Query(object):
def getFullQuery(self): def getFullQuery(self):
# get full querry including whitespaces # get full querry including whitespaces
return string.join(self.query_parts, '') return string.join(self.query_parts, '')
class SearchQuery(object):
"""container for all the search parameters (query, language, etc...)"""
def __init__(self, query, engines, categories, lang, safesearch, pageno, time_range):
self.query = query
self.engines = engines
self.categories = categories
self.lang = lang
self.safesearch = safesearch
self.pageno = pageno
self.time_range = time_range
def __str__(self):
return str(self.query) + ";" + str(self.engines)

View File

@ -128,6 +128,8 @@ class ResultContainer(object):
self.suggestions = set() self.suggestions = set()
self.answers = set() self.answers = set()
self._number_of_results = [] self._number_of_results = []
self._ordered = False
self.paging = False
def extend(self, engine_name, results): def extend(self, engine_name, results):
for result in list(results): for result in list(results):
@ -153,6 +155,9 @@ class ResultContainer(object):
self.results[engine_name].extend(results) self.results[engine_name].extend(results)
if not self.paging and engines[engine_name].paging:
self.paging = True
for i, result in enumerate(results): for i, result in enumerate(results):
try: try:
result['url'] = result['url'].decode('utf-8') result['url'] = result['url'].decode('utf-8')
@ -219,7 +224,7 @@ class ResultContainer(object):
with RLock(): with RLock():
self._merged_results.append(result) self._merged_results.append(result)
def get_ordered_results(self): def order_results(self):
for result in self._merged_results: for result in self._merged_results:
score = result_score(result) score = result_score(result)
result['score'] = score result['score'] = score
@ -269,8 +274,14 @@ class ResultContainer(object):
# update categoryIndex # update categoryIndex
categoryPositions[category] = {'index': len(gresults), 'count': 8} categoryPositions[category] = {'index': len(gresults), 'count': 8}
# return gresults # update _merged_results
return gresults self._ordered = True
self._merged_results = gresults
def get_ordered_results(self):
if not self._ordered:
self.order_results()
return self._merged_results
def results_length(self): def results_length(self):
return len(self._merged_results) return len(self._merged_results)

View File

@ -25,9 +25,10 @@ from searx.engines import (
categories, engines categories, engines
) )
from searx.utils import gen_useragent from searx.utils import gen_useragent
from searx.query import Query from searx.query import RawTextQuery, SearchQuery
from searx.results import ResultContainer from searx.results import ResultContainer
from searx import logger from searx import logger
from searx.plugins import plugins
logger = logger.getChild('search') logger = logger.getChild('search')
@ -127,86 +128,72 @@ def make_callback(engine_name, callback, params, result_container):
return process_callback return process_callback
class Search(object): def get_search_query_from_webapp(preferences, request_data):
query = None
"""Search information container""" query_engines = []
query_categories = []
def __init__(self, request): query_paging = False
# init vars query_pageno = 1
super(Search, self).__init__() query_lang = 'all'
self.query = None query_time_range = None
self.engines = []
self.categories = []
self.paging = False
self.pageno = 1
self.lang = 'all'
self.time_range = None
self.is_advanced = None
# set blocked engines # set blocked engines
self.disabled_engines = request.preferences.engines.get_disabled() disabled_engines = preferences.engines.get_disabled()
self.result_container = ResultContainer()
self.request_data = {}
# set specific language if set # set specific language if set
self.lang = request.preferences.get_value('language') query_lang = preferences.get_value('language')
# set request method # safesearch
if request.method == 'POST': query_safesearch = preferences.get_value('safesearch')
self.request_data = request.form
else:
self.request_data = request.args
# TODO better exceptions # TODO better exceptions
if not self.request_data.get('q'): if not request_data.get('q'):
raise Exception('noquery') raise Exception('noquery')
# set pagenumber # set pagenumber
pageno_param = self.request_data.get('pageno', '1') pageno_param = request_data.get('pageno', '1')
if not pageno_param.isdigit() or int(pageno_param) < 1: if not pageno_param.isdigit() or int(pageno_param) < 1:
pageno_param = 1 pageno_param = 1
self.pageno = int(pageno_param) query_pageno = int(pageno_param)
# parse query, if tags are set, which change # parse query, if tags are set, which change
# the serch engine or search-language # the serch engine or search-language
query_obj = Query(self.request_data['q'], self.disabled_engines) raw_text_query = RawTextQuery(request_data['q'], disabled_engines)
query_obj.parse_query() raw_text_query.parse_query()
# set query # set query
self.query = query_obj.getSearchQuery() query = raw_text_query.getSearchQuery()
# get last selected language in query, if possible # get last selected language in query, if possible
# TODO support search with multible languages # TODO support search with multible languages
if len(query_obj.languages): if len(raw_text_query.languages):
self.lang = query_obj.languages[-1] query_lang = raw_text_query.languages[-1]
self.time_range = self.request_data.get('time_range') query_time_range = request_data.get('time_range')
self.is_advanced = self.request_data.get('advanced_search')
self.engines = query_obj.engines query_engines = raw_text_query.engines
# if engines are calculated from query, # if engines are calculated from query,
# set categories by using that informations # set categories by using that informations
if self.engines and query_obj.specific: if query_engines and raw_text_query.specific:
self.categories = list(set(engine['category'] query_categories = list(set(engine['category']
for engine in self.engines)) for engine in query_engines))
# otherwise, using defined categories to # otherwise, using defined categories to
# calculate which engines should be used # calculate which engines should be used
else: else:
# set categories/engines # set categories/engines
load_default_categories = True load_default_categories = True
for pd_name, pd in self.request_data.items(): for pd_name, pd in request_data.items():
if pd_name == 'categories': if pd_name == 'categories':
self.categories.extend(categ for categ in map(unicode.strip, pd.split(',')) if categ in categories) query_categories.extend(categ for categ in map(unicode.strip, pd.split(',')) if categ in categories)
elif pd_name == 'engines': elif pd_name == 'engines':
pd_engines = [{'category': engines[engine].categories[0], pd_engines = [{'category': engines[engine].categories[0],
'name': engine} 'name': engine}
for engine in map(unicode.strip, pd.split(',')) if engine in engines] for engine in map(unicode.strip, pd.split(',')) if engine in engines]
if pd_engines: if pd_engines:
self.engines.extend(pd_engines) query_engines.extend(pd_engines)
load_default_categories = False load_default_categories = False
elif pd_name.startswith('category_'): elif pd_name.startswith('category_'):
category = pd_name[9:] category = pd_name[9:]
@ -217,45 +204,54 @@ class Search(object):
if pd != 'off': if pd != 'off':
# add category to list # add category to list
self.categories.append(category) query_categories.append(category)
elif category in self.categories: elif category in query_categories:
# remove category from list if property is set to 'off' # remove category from list if property is set to 'off'
self.categories.remove(category) query_categories.remove(category)
if not load_default_categories: if not load_default_categories:
if not self.categories: if not query_categories:
self.categories = list(set(engine['category'] query_categories = list(set(engine['category']
for engine in self.engines)) for engine in engines))
return else:
# if no category is specified for this search, # if no category is specified for this search,
# using user-defined default-configuration which # using user-defined default-configuration which
# (is stored in cookie) # (is stored in cookie)
if not self.categories: if not query_categories:
cookie_categories = request.preferences.get_value('categories') cookie_categories = preferences.get_value('categories')
for ccateg in cookie_categories: for ccateg in cookie_categories:
if ccateg in categories: if ccateg in categories:
self.categories.append(ccateg) query_categories.append(ccateg)
# if still no category is specified, using general # if still no category is specified, using general
# as default-category # as default-category
if not self.categories: if not query_categories:
self.categories = ['general'] query_categories = ['general']
# using all engines for that search, which are # using all engines for that search, which are
# declared under the specific categories # declared under the specific categories
for categ in self.categories: for categ in query_categories:
self.engines.extend({'category': categ, query_engines.extend({'category': categ,
'name': engine.name} 'name': engine.name}
for engine in categories[categ] for engine in categories[categ]
if (engine.name, categ) not in self.disabled_engines) if (engine.name, categ) not in disabled_engines)
# remove suspended engines return SearchQuery(query, query_engines, query_categories,
self.engines = [e for e in self.engines query_lang, query_safesearch, query_pageno, query_time_range)
if engines[e['name']].suspend_end_time <= time()]
class Search(object):
"""Search information container"""
def __init__(self, search_query):
# init vars
super(Search, self).__init__()
self.search_query = search_query
self.result_container = ResultContainer()
# do search-request # do search-request
def search(self, request): def search(self):
global number_of_searches global number_of_searches
# init vars # init vars
@ -268,23 +264,30 @@ class Search(object):
# user_agent = request.headers.get('User-Agent', '') # user_agent = request.headers.get('User-Agent', '')
user_agent = gen_useragent() user_agent = gen_useragent()
search_query = self.search_query
# start search-reqest for all selected engines # start search-reqest for all selected engines
for selected_engine in self.engines: for selected_engine in search_query.engines:
if selected_engine['name'] not in engines: if selected_engine['name'] not in engines:
continue continue
engine = engines[selected_engine['name']] engine = engines[selected_engine['name']]
# skip suspended engines
if engine.suspend_end_time and engine.suspend_end_time <= time():
continue
# if paging is not supported, skip # if paging is not supported, skip
if self.pageno > 1 and not engine.paging: if search_query.pageno > 1 and not engine.paging:
continue continue
# if search-language is set and engine does not # if search-language is set and engine does not
# provide language-support, skip # provide language-support, skip
if self.lang != 'all' and not engine.language_support: if search_query.lang != 'all' and not engine.language_support:
continue continue
if self.time_range and not engine.time_range_support: # if time_range is not supported, skip
if search_query.time_range and not engine.time_range_support:
continue continue
# set default request parameters # set default request parameters
@ -292,21 +295,20 @@ class Search(object):
request_params['headers']['User-Agent'] = user_agent request_params['headers']['User-Agent'] = user_agent
request_params['category'] = selected_engine['category'] request_params['category'] = selected_engine['category']
request_params['started'] = time() request_params['started'] = time()
request_params['pageno'] = self.pageno request_params['pageno'] = search_query.pageno
if hasattr(engine, 'language') and engine.language: if hasattr(engine, 'language') and engine.language:
request_params['language'] = engine.language request_params['language'] = engine.language
else: else:
request_params['language'] = self.lang request_params['language'] = search_query.lang
# 0 = None, 1 = Moderate, 2 = Strict # 0 = None, 1 = Moderate, 2 = Strict
request_params['safesearch'] = request.preferences.get_value('safesearch') request_params['safesearch'] = search_query.safesearch
request_params['time_range'] = self.time_range request_params['time_range'] = search_query.time_range
request_params['advanced_search'] = self.is_advanced
# update request parameters dependent on # update request parameters dependent on
# search-engine (contained in engines folder) # search-engine (contained in engines folder)
engine.request(self.query.encode('utf-8'), request_params) engine.request(search_query.query.encode('utf-8'), request_params)
if request_params['url'] is None: if request_params['url'] is None:
# TODO add support of offline engines # TODO add support of offline engines
@ -346,10 +348,44 @@ class Search(object):
selected_engine['name'])) selected_engine['name']))
if not requests: if not requests:
return self return self.result_container
# send all search-request # send all search-request
threaded_requests(requests) threaded_requests(requests)
start_new_thread(gc.collect, tuple()) start_new_thread(gc.collect, tuple())
# return results, suggestions, answers and infoboxes # return results, suggestions, answers and infoboxes
return self return self.result_container
def search_with_plugins(do_search, search_query, request, request_data, result_container):
"""Search using the do_search function and with plugins filtering.
Standalone function to have a well define locals().
result_container contains the results after the function call.
"""
search = search_query
if plugins.call('pre_search', request, locals()):
do_search()
plugins.call('post_search', request, locals())
results = result_container.get_ordered_results()
for result in results:
plugins.call('on_result', request, locals())
class SearchWithPlugins(Search):
def __init__(self, search_query, request):
super(SearchWithPlugins, self).__init__(search_query)
self.request = request
self.request_data = request.request_data
def search(self):
def do_search():
super(SearchWithPlugins, self).search()
search_with_plugins(do_search, self.search_query, self.request, self.request_data, self.result_container)
return self.result_container

View File

@ -62,8 +62,8 @@ from searx.utils import (
) )
from searx.version import VERSION_STRING from searx.version import VERSION_STRING
from searx.languages import language_codes from searx.languages import language_codes
from searx.search import Search from searx.search import Search, SearchWithPlugins, get_search_query_from_webapp
from searx.query import Query from searx.query import RawTextQuery, SearchQuery
from searx.autocomplete import searx_bang, backends as autocomplete_backends from searx.autocomplete import searx_bang, backends as autocomplete_backends
from searx.plugins import plugins from searx.plugins import plugins
from searx.preferences import Preferences, ValidationException from searx.preferences import Preferences, ValidationException
@ -364,6 +364,16 @@ def render(template_name, override_theme=None, **kwargs):
@app.before_request @app.before_request
def pre_request(): def pre_request():
# request.request_data
if request.method == 'POST':
request_data = request.form
elif request.method == 'GET':
request_data = request.args
else:
request_data = {}
request.request_data = request_data
# merge GET, POST vars # merge GET, POST vars
preferences = Preferences(themes, categories.keys(), engines, plugins) preferences = Preferences(themes, categories.keys(), engines, plugins)
try: try:
@ -373,11 +383,13 @@ def pre_request():
logger.warning('Invalid config') logger.warning('Invalid config')
request.preferences = preferences request.preferences = preferences
# request.form
request.form = dict(request.form.items()) request.form = dict(request.form.items())
for k, v in request.args.items(): for k, v in request.args.items():
if k not in request.form: if k not in request.form:
request.form[k] = v request.form[k] = v
# request.user_plugins
request.user_plugins = [] request.user_plugins = []
allowed_plugins = preferences.plugins.get_enabled() allowed_plugins = preferences.plugins.get_enabled()
disabled_plugins = preferences.plugins.get_disabled() disabled_plugins = preferences.plugins.get_disabled()
@ -400,30 +412,33 @@ def index():
'index.html', 'index.html',
) )
# search
search_query = None
result_container = None
try: try:
search = Search(request) search_query = get_search_query_from_webapp(request.preferences, request.request_data)
# search = Search(search_query) # without plugins
search = SearchWithPlugins(search_query, request)
result_container = search.search()
except: except:
return render( return render(
'index.html', 'index.html',
) )
if plugins.call('pre_search', request, locals()): results = result_container.get_ordered_results()
search.search(request)
plugins.call('post_search', request, locals()) # UI
advanced_search = request.request_data.get('advanced_search', None)
results = search.result_container.get_ordered_results() output_format = request.request_data.get('format', 'html')
if output_format not in ['html', 'csv', 'json', 'rss']:
output_format = 'html'
# output
for result in results: for result in results:
if output_format == 'html':
plugins.call('on_result', request, locals())
if not search.paging and engines[result['engine']].paging:
search.paging = True
if search.request_data.get('format', 'html') == 'html':
if 'content' in result and result['content']: if 'content' in result and result['content']:
result['content'] = highlight_content(result['content'][:1024], search.query.encode('utf-8')) result['content'] = highlight_content(result['content'][:1024], search_query.query.encode('utf-8'))
result['title'] = highlight_content(result['title'], search.query.encode('utf-8')) result['title'] = highlight_content(result['title'], search_query.query.encode('utf-8'))
else: else:
if result.get('content'): if result.get('content'):
result['content'] = html_to_text(result['content']).strip() result['content'] = html_to_text(result['content']).strip()
@ -450,16 +465,16 @@ def index():
else: else:
result['publishedDate'] = format_date(result['publishedDate']) result['publishedDate'] = format_date(result['publishedDate'])
number_of_results = search.result_container.results_number() number_of_results = result_container.results_number()
if number_of_results < search.result_container.results_length(): if number_of_results < result_container.results_length():
number_of_results = 0 number_of_results = 0
if search.request_data.get('format') == 'json': if output_format == 'json':
return Response(json.dumps({'query': search.query, return Response(json.dumps({'query': search_query.query,
'number_of_results': number_of_results, 'number_of_results': number_of_results,
'results': results}), 'results': results}),
mimetype='application/json') mimetype='application/json')
elif search.request_data.get('format') == 'csv': elif output_format == 'csv':
csv = UnicodeWriter(cStringIO.StringIO()) csv = UnicodeWriter(cStringIO.StringIO())
keys = ('title', 'url', 'content', 'host', 'engine', 'score') keys = ('title', 'url', 'content', 'host', 'engine', 'score')
csv.writerow(keys) csv.writerow(keys)
@ -468,14 +483,14 @@ def index():
csv.writerow([row.get(key, '') for key in keys]) csv.writerow([row.get(key, '') for key in keys])
csv.stream.seek(0) csv.stream.seek(0)
response = Response(csv.stream.read(), mimetype='application/csv') response = Response(csv.stream.read(), mimetype='application/csv')
cont_disp = 'attachment;Filename=searx_-_{0}.csv'.format(search.query.encode('utf-8')) cont_disp = 'attachment;Filename=searx_-_{0}.csv'.format(search_query.query.encode('utf-8'))
response.headers.add('Content-Disposition', cont_disp) response.headers.add('Content-Disposition', cont_disp)
return response return response
elif search.request_data.get('format') == 'rss': elif output_format == 'rss':
response_rss = render( response_rss = render(
'opensearch_response_rss.xml', 'opensearch_response_rss.xml',
results=results, results=results,
q=search.request_data['q'], q=request.request_data['q'],
number_of_results=number_of_results, number_of_results=number_of_results,
base_url=get_base_url() base_url=get_base_url()
) )
@ -484,17 +499,17 @@ def index():
return render( return render(
'results.html', 'results.html',
results=results, results=results,
q=search.request_data['q'], q=request.request_data['q'],
selected_categories=search.categories, selected_categories=search_query.categories,
paging=search.paging, pageno=search_query.pageno,
time_range=search_query.time_range,
number_of_results=format_decimal(number_of_results), number_of_results=format_decimal(number_of_results),
pageno=search.pageno, advanced_search=advanced_search,
advanced_search=search.is_advanced, suggestions=result_container.suggestions,
time_range=search.time_range, answers=result_container.answers,
infoboxes=result_container.infoboxes,
paging=result_container.paging,
base_url=get_base_url(), base_url=get_base_url(),
suggestions=search.result_container.suggestions,
answers=search.result_container.answers,
infoboxes=search.result_container.infoboxes,
theme=get_current_theme_name(), theme=get_current_theme_name(),
favicons=global_favicons[themes.index(get_current_theme_name())] favicons=global_favicons[themes.index(get_current_theme_name())]
) )
@ -511,30 +526,23 @@ def about():
@app.route('/autocompleter', methods=['GET', 'POST']) @app.route('/autocompleter', methods=['GET', 'POST'])
def autocompleter(): def autocompleter():
"""Return autocompleter results""" """Return autocompleter results"""
request_data = {}
# select request method
if request.method == 'POST':
request_data = request.form
else:
request_data = request.args
# set blocked engines # set blocked engines
disabled_engines = request.preferences.engines.get_disabled() disabled_engines = request.preferences.engines.get_disabled()
# parse query # parse query
query = Query(request_data.get('q', '').encode('utf-8'), disabled_engines) raw_text_query = RawTextQuery(request.request_data.get('q', '').encode('utf-8'), disabled_engines)
query.parse_query() raw_text_query.parse_query()
# check if search query is set # check if search query is set
if not query.getSearchQuery(): if not raw_text_query.getSearchQuery():
return '', 400 return '', 400
# run autocompleter # run autocompleter
completer = autocomplete_backends.get(request.preferences.get_value('autocomplete')) completer = autocomplete_backends.get(request.preferences.get_value('autocomplete'))
# parse searx specific autocompleter results like !bang # parse searx specific autocompleter results like !bang
raw_results = searx_bang(query) raw_results = searx_bang(raw_text_query)
# normal autocompletion results only appear if max 3 inner results returned # normal autocompletion results only appear if max 3 inner results returned
if len(raw_results) <= 3 and completer: if len(raw_results) <= 3 and completer:
@ -545,19 +553,19 @@ def autocompleter():
else: else:
language = language.split('_')[0] language = language.split('_')[0]
# run autocompletion # run autocompletion
raw_results.extend(completer(query.getSearchQuery(), language)) raw_results.extend(completer(raw_text_query.getSearchQuery(), language))
# parse results (write :language and !engine back to result string) # parse results (write :language and !engine back to result string)
results = [] results = []
for result in raw_results: for result in raw_results:
query.changeSearchQuery(result) raw_text_query.changeSearchQuery(result)
# add parsed result # add parsed result
results.append(query.getFullQuery()) results.append(raw_text_query.getFullQuery())
# return autocompleter results # return autocompleter results
if request_data.get('format') == 'x-suggestions': if request.request_data.get('format') == 'x-suggestions':
return Response(json.dumps([query.query, results]), return Response(json.dumps([raw_text_query.query, results]),
mimetype='application/json') mimetype='application/json')
return Response(json.dumps(results), return Response(json.dumps(results),

View File

@ -6,9 +6,8 @@ from mock import Mock
def get_search_mock(query, **kwargs): def get_search_mock(query, **kwargs):
return {'search': Mock(query=query, return {'search': Mock(query=query, **kwargs),
result_container=Mock(answers=set()), 'result_container': Mock(answers=set())}
**kwargs)}
class PluginStoreTest(SearxTestCase): class PluginStoreTest(SearxTestCase):
@ -54,11 +53,11 @@ class SelfIPTest(SearxTestCase):
request.headers.getlist.return_value = [] request.headers.getlist.return_value = []
ctx = get_search_mock(query='ip', pageno=1) ctx = get_search_mock(query='ip', pageno=1)
store.call('post_search', request, ctx) store.call('post_search', request, ctx)
self.assertTrue('127.0.0.1' in ctx['search'].result_container.answers) self.assertTrue('127.0.0.1' in ctx['result_container'].answers)
ctx = get_search_mock(query='ip', pageno=2) ctx = get_search_mock(query='ip', pageno=2)
store.call('post_search', request, ctx) store.call('post_search', request, ctx)
self.assertFalse('127.0.0.1' in ctx['search'].result_container.answers) self.assertFalse('127.0.0.1' in ctx['result_container'].answers)
# User agent test # User agent test
request = Mock(user_plugins=store.plugins, request = Mock(user_plugins=store.plugins,
@ -67,24 +66,24 @@ class SelfIPTest(SearxTestCase):
ctx = get_search_mock(query='user-agent', pageno=1) ctx = get_search_mock(query='user-agent', pageno=1)
store.call('post_search', request, ctx) store.call('post_search', request, ctx)
self.assertTrue('Mock' in ctx['search'].result_container.answers) self.assertTrue('Mock' in ctx['result_container'].answers)
ctx = get_search_mock(query='user-agent', pageno=2) ctx = get_search_mock(query='user-agent', pageno=2)
store.call('post_search', request, ctx) store.call('post_search', request, ctx)
self.assertFalse('Mock' in ctx['search'].result_container.answers) self.assertFalse('Mock' in ctx['result_container'].answers)
ctx = get_search_mock(query='user-agent', pageno=1) ctx = get_search_mock(query='user-agent', pageno=1)
store.call('post_search', request, ctx) store.call('post_search', request, ctx)
self.assertTrue('Mock' in ctx['search'].result_container.answers) self.assertTrue('Mock' in ctx['result_container'].answers)
ctx = get_search_mock(query='user-agent', pageno=2) ctx = get_search_mock(query='user-agent', pageno=2)
store.call('post_search', request, ctx) store.call('post_search', request, ctx)
self.assertFalse('Mock' in ctx['search'].result_container.answers) self.assertFalse('Mock' in ctx['result_container'].answers)
ctx = get_search_mock(query='What is my User-Agent?', pageno=1) ctx = get_search_mock(query='What is my User-Agent?', pageno=1)
store.call('post_search', request, ctx) store.call('post_search', request, ctx)
self.assertTrue('Mock' in ctx['search'].result_container.answers) self.assertTrue('Mock' in ctx['result_container'].answers)
ctx = get_search_mock(query='What is my User-Agent?', pageno=2) ctx = get_search_mock(query='What is my User-Agent?', pageno=2)
store.call('post_search', request, ctx) store.call('post_search', request, ctx)
self.assertFalse('Mock' in ctx['search'].result_container.answers) self.assertFalse('Mock' in ctx['result_container'].answers)