Merge pull request #2484 from return42/limiter-ip_lists

[mod] limiter: blocklist and passlist (ip_lists)
This commit is contained in:
Markus Heiser 2023-06-06 09:09:20 +02:00 committed by GitHub
commit b295b497f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 153 additions and 15 deletions

View File

@ -15,6 +15,9 @@ Bot Detection
.. automodule:: searx.botdetection.limiter
:members:
.. automodule:: searx.botdetection.ip_lists
:members:
Rate limit
==========

View File

@ -6,8 +6,8 @@ from __future__ import annotations
from ipaddress import (
IPv4Network,
IPv6Network,
IPv4Address,
IPv6Address,
ip_address,
ip_network,
)
import flask
@ -46,11 +46,10 @@ def too_many_requests(network: IPv4Network | IPv6Network, log_msg: str) -> werkz
return flask.make_response(('Too Many Requests', 429))
def get_network(real_ip: str, cfg: config.Config) -> IPv4Network | IPv6Network:
def get_network(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> IPv4Network | IPv6Network:
"""Returns the (client) network of whether the real_ip is part of."""
ip = ip_address(real_ip)
if isinstance(ip, IPv6Address):
if real_ip.version == 6:
prefix = cfg['real_ip.ipv6_prefix']
else:
prefix = cfg['real_ip.ipv4_prefix']
@ -99,7 +98,7 @@ def get_real_ip(request: flask.Request) -> str:
from .limiter import get_cfg # pylint: disable=import-outside-toplevel, cyclic-import
forwarded_for = [x.strip() for x in forwarded_for.split(',')]
x_for: int = get_cfg()['real_ip.x_for']
x_for: int = get_cfg()['real_ip.x_for'] # type: ignore
forwarded_for = forwarded_for[-min(len(forwarded_for), x_for)]
if not real_ip:

View File

@ -49,14 +49,16 @@ import werkzeug
from searx.tools import config
from searx import redisdb
from searx import logger
from searx.redislib import incr_sliding_window, drop_counter
from . import link_token
from ._helpers import too_many_requests
from ._helpers import (
too_many_requests,
logger,
)
logger = logger.getChild('botdetection.ip_limit')
logger = logger.getChild('ip_limit')
BURST_WINDOW = 20
"""Time (sec) before sliding window for *burst* requests expires."""

View File

@ -0,0 +1,85 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
""".. _botdetection.ip_lists:
Method ``ip_lists``
-------------------
The ``ip_lists`` method implements IP :py:obj:`block- <block_ip>` and
:py:obj:`pass-lists <pass_ip>`.
.. code:: toml
[botdetection.ip_lists]
pass_ip = [
'140.238.172.132', # IPv4 of check.searx.space
'192.168.0.0/16', # IPv4 private network
'fe80::/10' # IPv6 linklocal
]
block_ip = [
'93.184.216.34', # IPv4 of example.org
'257.1.1.1', # invalid IP --> will be ignored, logged in ERROR class
]
"""
# pylint: disable=unused-argument
from __future__ import annotations
from typing import Tuple
from ipaddress import (
ip_network,
IPv4Address,
IPv6Address,
)
from searx.tools import config
from ._helpers import logger
logger = logger.getChild('ip_limit')
SEARXNG_ORG = [
# https://github.com/searxng/searxng/pull/2484#issuecomment-1576639195
'140.238.172.132', # IPv4 check.searx.space
'2603:c022:0:4900::/56', # IPv6 check.searx.space
]
"""Passlist of IPs from the SearXNG organization, e.g. `check.searx.space`."""
def pass_ip(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> Tuple[bool, str]:
"""Checks if the IP on the subnet is in one of the members of the
``botdetection.ip_lists.pass_ip`` list.
"""
if cfg.get('botdetection.ip_lists.pass_searxng_org', default=True):
for net in SEARXNG_ORG:
net = ip_network(net, strict=False)
if real_ip.version == net.version and real_ip in net:
return True, f"IP matches {net.compressed} in SEARXNG_ORG list."
return ip_is_subnet_of_member_in_list(real_ip, 'botdetection.ip_lists.pass_ip', cfg)
def block_ip(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> Tuple[bool, str]:
"""Checks if the IP on the subnet is in one of the members of the
``botdetection.ip_lists.block_ip`` list.
"""
block, msg = ip_is_subnet_of_member_in_list(real_ip, 'botdetection.ip_lists.block_ip', cfg)
if block:
msg += " To remove IP from list, please contact the maintainer of the service."
return block, msg
def ip_is_subnet_of_member_in_list(
real_ip: IPv4Address | IPv6Address, list_name: str, cfg: config.Config
) -> Tuple[bool, str]:
for net in cfg.get(list_name, default=[]):
try:
net = ip_network(net, strict=False)
except ValueError:
logger.error("invalid IP %s in %s", net, list_name)
continue
if real_ip.version == net.version and real_ip in net:
return True, f"IP matches {net.compressed} in {list_name}."
return False, f"IP is not a member of an item in the f{list_name} list"

View File

@ -40,6 +40,7 @@ and set the redis-url connection. Check the value, it depends on your redis DB
from __future__ import annotations
from pathlib import Path
from ipaddress import ip_address
import flask
import werkzeug
@ -53,6 +54,7 @@ from . import (
http_connection,
http_user_agent,
ip_limit,
ip_lists,
)
from ._helpers import (
@ -84,16 +86,41 @@ def get_cfg() -> config.Config:
def filter_request(request: flask.Request) -> werkzeug.Response | None:
# pylint: disable=too-many-return-statements
cfg = get_cfg()
real_ip = get_real_ip(request)
real_ip = ip_address(get_real_ip(request))
network = get_network(real_ip, cfg)
if network.is_link_local:
return None
if request.path == '/healthz':
return None
# link-local
if network.is_link_local:
return None
# block- & pass- lists
#
# 1. The IP of the request is first checked against the pass-list; if the IP
# matches an entry in the list, the request is not blocked.
# 2. If no matching entry is found in the pass-list, then a check is made against
# the block list; if the IP matches an entry in the list, the request is
# blocked.
# 3. If the IP is not in either list, the request is not blocked.
match, msg = ip_lists.pass_ip(real_ip, cfg)
if match:
logger.warning("PASS %s: matched PASSLIST - %s", network.compressed, msg)
return None
match, msg = ip_lists.block_ip(real_ip, cfg)
if match:
logger.error("BLOCK %s: matched BLOCKLIST - %s", network.compressed, msg)
return flask.make_response(('IP is on BLOCKLIST - %s' % msg, 429))
# methods applied on /
for func in [
http_user_agent,
]:
@ -101,6 +128,8 @@ def filter_request(request: flask.Request) -> werkzeug.Response | None:
if val is not None:
return val
# methods applied on /search
if request.path == '/search':
for func in [

View File

@ -16,7 +16,25 @@ ipv6_prefix = 48
# (networks) are not monitored by the ip_limit
filter_link_local = false
# acrivate link_token method in the ip_limit method
# activate link_token method in the ip_limit method
link_token = false
[botdetection.ip_lists]
# In the limiter, the ip_lists method has priority over all other methods -> if
# an IP is in the pass_ip list, it has unrestricted access and it is also not
# checked if e.g. the "user agent" suggests a bot (e.g. curl).
block_ip = [
# '93.184.216.34', # IPv4 of example.org
# '257.1.1.1', # invalid IP --> will be ignored, logged in ERROR class
]
pass_ip = [
# '192.168.0.0/16', # IPv4 private network
# 'fe80::/10' # IPv6 linklocal / wins over botdetection.ip_limit.filter_link_local
]
# Activate passlist of (hardcoded) IPs from the SearXNG organization,
# e.g. `check.searx.space`.
pass_searxng_org = true

View File

@ -39,6 +39,7 @@ from __future__ import annotations
from ipaddress import (
IPv4Network,
IPv6Network,
ip_address,
)
import string
@ -107,7 +108,7 @@ def ping(request: flask.Request, token: str):
return
cfg = limiter.get_cfg()
real_ip = get_real_ip(request)
real_ip = ip_address(get_real_ip(request))
network = get_network(real_ip, cfg)
ping_key = get_ping_key(network, request)

View File

@ -8,6 +8,7 @@ structured dictionaries. The configuration schema is defined in a dictionary
structure and the configuration data is given in a dictionary structure.
"""
from __future__ import annotations
from typing import Any
import copy
import typing
@ -97,7 +98,7 @@ class Config:
self.deprecated = deprecated
self.cfg = copy.deepcopy(cfg_schema)
def __getitem__(self, key: str):
def __getitem__(self, key: str) -> Any:
return self.get(key)
def validate(self, cfg: dict):
@ -115,7 +116,7 @@ class Config:
"""Returns default value of field ``name`` in ``self.cfg_schema``."""
return value(name, self.cfg_schema)
def get(self, name: str, default=UNSET, replace=True):
def get(self, name: str, default: Any = UNSET, replace: bool = True) -> Any:
"""Returns the value to which ``name`` points in the configuration.
If there is no such ``name`` in the config and the ``default`` is