first commit
This commit is contained in:
commit
205faf4224
5471 changed files with 973850 additions and 0 deletions
|
|
@ -0,0 +1,22 @@
|
|||
from ._json import json
|
||||
from .encoding import base64_decode as base64_decode
|
||||
from .encoding import base64_encode as base64_encode
|
||||
from .encoding import want_bytes as want_bytes
|
||||
from .exc import BadData as BadData
|
||||
from .exc import BadHeader as BadHeader
|
||||
from .exc import BadPayload as BadPayload
|
||||
from .exc import BadSignature as BadSignature
|
||||
from .exc import BadTimeSignature as BadTimeSignature
|
||||
from .exc import SignatureExpired as SignatureExpired
|
||||
from .jws import JSONWebSignatureSerializer
|
||||
from .jws import TimedJSONWebSignatureSerializer
|
||||
from .serializer import Serializer as Serializer
|
||||
from .signer import HMACAlgorithm as HMACAlgorithm
|
||||
from .signer import NoneAlgorithm as NoneAlgorithm
|
||||
from .signer import Signer as Signer
|
||||
from .timed import TimedSerializer as TimedSerializer
|
||||
from .timed import TimestampSigner as TimestampSigner
|
||||
from .url_safe import URLSafeSerializer as URLSafeSerializer
|
||||
from .url_safe import URLSafeTimedSerializer as URLSafeTimedSerializer
|
||||
|
||||
__version__ = "2.0.1"
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
import json as _json
|
||||
import typing as _t
|
||||
from types import ModuleType
|
||||
|
||||
|
||||
class _CompactJSON:
|
||||
"""Wrapper around json module that strips whitespace."""
|
||||
|
||||
@staticmethod
|
||||
def loads(payload: _t.Union[str, bytes]) -> _t.Any:
|
||||
return _json.loads(payload)
|
||||
|
||||
@staticmethod
|
||||
def dumps(obj: _t.Any, **kwargs: _t.Any) -> str:
|
||||
kwargs.setdefault("ensure_ascii", False)
|
||||
kwargs.setdefault("separators", (",", ":"))
|
||||
return _json.dumps(obj, **kwargs)
|
||||
|
||||
|
||||
class DeprecatedJSON(ModuleType):
|
||||
def __getattribute__(self, item: str) -> _t.Any:
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"Importing 'itsdangerous.json' is deprecated and will be"
|
||||
" removed in ItsDangerous 2.1. Use Python's 'json' module"
|
||||
" instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
return getattr(_json, item)
|
||||
|
||||
|
||||
json = DeprecatedJSON("json")
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
import base64
|
||||
import string
|
||||
import struct
|
||||
import typing as _t
|
||||
|
||||
from .exc import BadData
|
||||
|
||||
_t_str_bytes = _t.Union[str, bytes]
|
||||
|
||||
|
||||
def want_bytes(
|
||||
s: _t_str_bytes, encoding: str = "utf-8", errors: str = "strict"
|
||||
) -> bytes:
|
||||
if isinstance(s, str):
|
||||
s = s.encode(encoding, errors)
|
||||
|
||||
return s
|
||||
|
||||
|
||||
def base64_encode(string: _t_str_bytes) -> bytes:
|
||||
"""Base64 encode a string of bytes or text. The resulting bytes are
|
||||
safe to use in URLs.
|
||||
"""
|
||||
string = want_bytes(string)
|
||||
return base64.urlsafe_b64encode(string).rstrip(b"=")
|
||||
|
||||
|
||||
def base64_decode(string: _t_str_bytes) -> bytes:
|
||||
"""Base64 decode a URL-safe string of bytes or text. The result is
|
||||
bytes.
|
||||
"""
|
||||
string = want_bytes(string, encoding="ascii", errors="ignore")
|
||||
string += b"=" * (-len(string) % 4)
|
||||
|
||||
try:
|
||||
return base64.urlsafe_b64decode(string)
|
||||
except (TypeError, ValueError):
|
||||
raise BadData("Invalid base64-encoded data")
|
||||
|
||||
|
||||
# The alphabet used by base64.urlsafe_*
|
||||
_base64_alphabet = f"{string.ascii_letters}{string.digits}-_=".encode("ascii")
|
||||
|
||||
_int64_struct = struct.Struct(">Q")
|
||||
_int_to_bytes = _int64_struct.pack
|
||||
_bytes_to_int = _t.cast("_t.Callable[[bytes], _t.Tuple[int]]", _int64_struct.unpack)
|
||||
|
||||
|
||||
def int_to_bytes(num: int) -> bytes:
|
||||
return _int_to_bytes(num).lstrip(b"\x00")
|
||||
|
||||
|
||||
def bytes_to_int(bytestr: bytes) -> int:
|
||||
return _bytes_to_int(bytestr.rjust(8, b"\x00"))[0]
|
||||
|
|
@ -0,0 +1,107 @@
|
|||
import typing as _t
|
||||
from datetime import datetime
|
||||
|
||||
_t_opt_any = _t.Optional[_t.Any]
|
||||
_t_opt_exc = _t.Optional[Exception]
|
||||
|
||||
|
||||
class BadData(Exception):
|
||||
"""Raised if bad data of any sort was encountered. This is the base
|
||||
for all exceptions that ItsDangerous defines.
|
||||
|
||||
.. versionadded:: 0.15
|
||||
"""
|
||||
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
self.message = message
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.message
|
||||
|
||||
|
||||
class BadSignature(BadData):
|
||||
"""Raised if a signature does not match."""
|
||||
|
||||
def __init__(self, message: str, payload: _t_opt_any = None):
|
||||
super().__init__(message)
|
||||
|
||||
#: The payload that failed the signature test. In some
|
||||
#: situations you might still want to inspect this, even if
|
||||
#: you know it was tampered with.
|
||||
#:
|
||||
#: .. versionadded:: 0.14
|
||||
self.payload: _t_opt_any = payload
|
||||
|
||||
|
||||
class BadTimeSignature(BadSignature):
|
||||
"""Raised if a time-based signature is invalid. This is a subclass
|
||||
of :class:`BadSignature`.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
payload: _t_opt_any = None,
|
||||
date_signed: _t.Optional[datetime] = None,
|
||||
):
|
||||
super().__init__(message, payload)
|
||||
|
||||
#: If the signature expired this exposes the date of when the
|
||||
#: signature was created. This can be helpful in order to
|
||||
#: tell the user how long a link has been gone stale.
|
||||
#:
|
||||
#: .. versionchanged:: 2.0
|
||||
#: The datetime value is timezone-aware rather than naive.
|
||||
#:
|
||||
#: .. versionadded:: 0.14
|
||||
self.date_signed = date_signed
|
||||
|
||||
|
||||
class SignatureExpired(BadTimeSignature):
|
||||
"""Raised if a signature timestamp is older than ``max_age``. This
|
||||
is a subclass of :exc:`BadTimeSignature`.
|
||||
"""
|
||||
|
||||
|
||||
class BadHeader(BadSignature):
|
||||
"""Raised if a signed header is invalid in some form. This only
|
||||
happens for serializers that have a header that goes with the
|
||||
signature.
|
||||
|
||||
.. versionadded:: 0.24
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
payload: _t_opt_any = None,
|
||||
header: _t_opt_any = None,
|
||||
original_error: _t_opt_exc = None,
|
||||
):
|
||||
super().__init__(message, payload)
|
||||
|
||||
#: If the header is actually available but just malformed it
|
||||
#: might be stored here.
|
||||
self.header: _t_opt_any = header
|
||||
|
||||
#: If available, the error that indicates why the payload was
|
||||
#: not valid. This might be ``None``.
|
||||
self.original_error: _t_opt_exc = original_error
|
||||
|
||||
|
||||
class BadPayload(BadData):
|
||||
"""Raised if a payload is invalid. This could happen if the payload
|
||||
is loaded despite an invalid signature, or if there is a mismatch
|
||||
between the serializer and deserializer. The original exception
|
||||
that occurred during loading is stored on as :attr:`original_error`.
|
||||
|
||||
.. versionadded:: 0.15
|
||||
"""
|
||||
|
||||
def __init__(self, message: str, original_error: _t_opt_exc = None):
|
||||
super().__init__(message)
|
||||
|
||||
#: If available, the error that indicates why the payload was
|
||||
#: not valid. This might be ``None``.
|
||||
self.original_error: _t_opt_exc = original_error
|
||||
|
|
@ -0,0 +1,259 @@
|
|||
import hashlib
|
||||
import time
|
||||
import warnings
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from decimal import Decimal
|
||||
from numbers import Real
|
||||
|
||||
from ._json import _CompactJSON
|
||||
from .encoding import base64_decode
|
||||
from .encoding import base64_encode
|
||||
from .encoding import want_bytes
|
||||
from .exc import BadData
|
||||
from .exc import BadHeader
|
||||
from .exc import BadPayload
|
||||
from .exc import BadSignature
|
||||
from .exc import SignatureExpired
|
||||
from .serializer import Serializer
|
||||
from .signer import HMACAlgorithm
|
||||
from .signer import NoneAlgorithm
|
||||
|
||||
|
||||
class JSONWebSignatureSerializer(Serializer):
|
||||
"""This serializer implements JSON Web Signature (JWS) support. Only
|
||||
supports the JWS Compact Serialization.
|
||||
|
||||
.. deprecated:: 2.0
|
||||
Will be removed in ItsDangerous 2.1. Use a dedicated library
|
||||
such as authlib.
|
||||
"""
|
||||
|
||||
jws_algorithms = {
|
||||
"HS256": HMACAlgorithm(hashlib.sha256),
|
||||
"HS384": HMACAlgorithm(hashlib.sha384),
|
||||
"HS512": HMACAlgorithm(hashlib.sha512),
|
||||
"none": NoneAlgorithm(),
|
||||
}
|
||||
|
||||
#: The default algorithm to use for signature generation
|
||||
default_algorithm = "HS512"
|
||||
|
||||
default_serializer = _CompactJSON
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
secret_key,
|
||||
salt=None,
|
||||
serializer=None,
|
||||
serializer_kwargs=None,
|
||||
signer=None,
|
||||
signer_kwargs=None,
|
||||
algorithm_name=None,
|
||||
):
|
||||
warnings.warn(
|
||||
"JWS support is deprecated and will be removed in"
|
||||
" ItsDangerous 2.1. Use a dedicated JWS/JWT library such as"
|
||||
" authlib.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
super().__init__(
|
||||
secret_key,
|
||||
salt=salt,
|
||||
serializer=serializer,
|
||||
serializer_kwargs=serializer_kwargs,
|
||||
signer=signer,
|
||||
signer_kwargs=signer_kwargs,
|
||||
)
|
||||
|
||||
if algorithm_name is None:
|
||||
algorithm_name = self.default_algorithm
|
||||
|
||||
self.algorithm_name = algorithm_name
|
||||
self.algorithm = self.make_algorithm(algorithm_name)
|
||||
|
||||
def load_payload(self, payload, serializer=None, return_header=False):
|
||||
payload = want_bytes(payload)
|
||||
|
||||
if b"." not in payload:
|
||||
raise BadPayload('No "." found in value')
|
||||
|
||||
base64d_header, base64d_payload = payload.split(b".", 1)
|
||||
|
||||
try:
|
||||
json_header = base64_decode(base64d_header)
|
||||
except Exception as e:
|
||||
raise BadHeader(
|
||||
"Could not base64 decode the header because of an exception",
|
||||
original_error=e,
|
||||
)
|
||||
|
||||
try:
|
||||
json_payload = base64_decode(base64d_payload)
|
||||
except Exception as e:
|
||||
raise BadPayload(
|
||||
"Could not base64 decode the payload because of an exception",
|
||||
original_error=e,
|
||||
)
|
||||
|
||||
try:
|
||||
header = super().load_payload(json_header, serializer=_CompactJSON)
|
||||
except BadData as e:
|
||||
raise BadHeader(
|
||||
"Could not unserialize header because it was malformed",
|
||||
original_error=e,
|
||||
)
|
||||
|
||||
if not isinstance(header, dict):
|
||||
raise BadHeader("Header payload is not a JSON object", header=header)
|
||||
|
||||
payload = super().load_payload(json_payload, serializer=serializer)
|
||||
|
||||
if return_header:
|
||||
return payload, header
|
||||
|
||||
return payload
|
||||
|
||||
def dump_payload(self, header, obj):
|
||||
base64d_header = base64_encode(
|
||||
self.serializer.dumps(header, **self.serializer_kwargs)
|
||||
)
|
||||
base64d_payload = base64_encode(
|
||||
self.serializer.dumps(obj, **self.serializer_kwargs)
|
||||
)
|
||||
return base64d_header + b"." + base64d_payload
|
||||
|
||||
def make_algorithm(self, algorithm_name):
|
||||
try:
|
||||
return self.jws_algorithms[algorithm_name]
|
||||
except KeyError:
|
||||
raise NotImplementedError("Algorithm not supported")
|
||||
|
||||
def make_signer(self, salt=None, algorithm=None):
|
||||
if salt is None:
|
||||
salt = self.salt
|
||||
|
||||
key_derivation = "none" if salt is None else None
|
||||
|
||||
if algorithm is None:
|
||||
algorithm = self.algorithm
|
||||
|
||||
return self.signer(
|
||||
self.secret_keys,
|
||||
salt=salt,
|
||||
sep=".",
|
||||
key_derivation=key_derivation,
|
||||
algorithm=algorithm,
|
||||
)
|
||||
|
||||
def make_header(self, header_fields):
|
||||
header = header_fields.copy() if header_fields else {}
|
||||
header["alg"] = self.algorithm_name
|
||||
return header
|
||||
|
||||
def dumps(self, obj, salt=None, header_fields=None):
|
||||
"""Like :meth:`.Serializer.dumps` but creates a JSON Web
|
||||
Signature. It also allows for specifying additional fields to be
|
||||
included in the JWS header.
|
||||
"""
|
||||
header = self.make_header(header_fields)
|
||||
signer = self.make_signer(salt, self.algorithm)
|
||||
return signer.sign(self.dump_payload(header, obj))
|
||||
|
||||
def loads(self, s, salt=None, return_header=False):
|
||||
"""Reverse of :meth:`dumps`. If requested via ``return_header``
|
||||
it will return a tuple of payload and header.
|
||||
"""
|
||||
payload, header = self.load_payload(
|
||||
self.make_signer(salt, self.algorithm).unsign(want_bytes(s)),
|
||||
return_header=True,
|
||||
)
|
||||
|
||||
if header.get("alg") != self.algorithm_name:
|
||||
raise BadHeader("Algorithm mismatch", header=header, payload=payload)
|
||||
|
||||
if return_header:
|
||||
return payload, header
|
||||
|
||||
return payload
|
||||
|
||||
def loads_unsafe(self, s, salt=None, return_header=False):
|
||||
kwargs = {"return_header": return_header}
|
||||
return self._loads_unsafe_impl(s, salt, kwargs, kwargs)
|
||||
|
||||
|
||||
class TimedJSONWebSignatureSerializer(JSONWebSignatureSerializer):
|
||||
"""Works like the regular :class:`JSONWebSignatureSerializer` but
|
||||
also records the time of the signing and can be used to expire
|
||||
signatures.
|
||||
|
||||
JWS currently does not specify this behavior but it mentions a
|
||||
possible extension like this in the spec. Expiry date is encoded
|
||||
into the header similar to what's specified in `draft-ietf-oauth
|
||||
-json-web-token <http://self-issued.info/docs/draft-ietf-oauth-json
|
||||
-web-token.html#expDef>`_.
|
||||
"""
|
||||
|
||||
DEFAULT_EXPIRES_IN = 3600
|
||||
|
||||
def __init__(self, secret_key, expires_in=None, **kwargs):
|
||||
super().__init__(secret_key, **kwargs)
|
||||
|
||||
if expires_in is None:
|
||||
expires_in = self.DEFAULT_EXPIRES_IN
|
||||
|
||||
self.expires_in = expires_in
|
||||
|
||||
def make_header(self, header_fields):
|
||||
header = super().make_header(header_fields)
|
||||
iat = self.now()
|
||||
exp = iat + self.expires_in
|
||||
header["iat"] = iat
|
||||
header["exp"] = exp
|
||||
return header
|
||||
|
||||
def loads(self, s, salt=None, return_header=False):
|
||||
payload, header = super().loads(s, salt, return_header=True)
|
||||
|
||||
if "exp" not in header:
|
||||
raise BadSignature("Missing expiry date", payload=payload)
|
||||
|
||||
int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload)
|
||||
|
||||
try:
|
||||
header["exp"] = int(header["exp"])
|
||||
except ValueError:
|
||||
raise int_date_error
|
||||
|
||||
if header["exp"] < 0:
|
||||
raise int_date_error
|
||||
|
||||
if header["exp"] < self.now():
|
||||
raise SignatureExpired(
|
||||
"Signature expired",
|
||||
payload=payload,
|
||||
date_signed=self.get_issue_date(header),
|
||||
)
|
||||
|
||||
if return_header:
|
||||
return payload, header
|
||||
|
||||
return payload
|
||||
|
||||
def get_issue_date(self, header):
|
||||
"""If the header contains the ``iat`` field, return the date the
|
||||
signature was issued, as a timezone-aware
|
||||
:class:`datetime.datetime` in UTC.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
The timestamp is returned as a timezone-aware ``datetime``
|
||||
in UTC rather than a naive ``datetime`` assumed to be UTC.
|
||||
"""
|
||||
rv = header.get("iat")
|
||||
|
||||
if isinstance(rv, (Real, Decimal)):
|
||||
return datetime.fromtimestamp(int(rv), tz=timezone.utc)
|
||||
|
||||
def now(self):
|
||||
return int(time.time())
|
||||
|
|
@ -0,0 +1,295 @@
|
|||
import json
|
||||
import typing as _t
|
||||
|
||||
from .encoding import want_bytes
|
||||
from .exc import BadPayload
|
||||
from .exc import BadSignature
|
||||
from .signer import _make_keys_list
|
||||
from .signer import Signer
|
||||
|
||||
_t_str_bytes = _t.Union[str, bytes]
|
||||
_t_opt_str_bytes = _t.Optional[_t_str_bytes]
|
||||
_t_kwargs = _t.Dict[str, _t.Any]
|
||||
_t_opt_kwargs = _t.Optional[_t_kwargs]
|
||||
_t_signer = _t.Type[Signer]
|
||||
_t_fallbacks = _t.List[_t.Union[_t_kwargs, _t.Tuple[_t_signer, _t_kwargs], _t_signer]]
|
||||
_t_load_unsafe = _t.Tuple[bool, _t.Any]
|
||||
_t_secret_key = _t.Union[_t.Iterable[_t_str_bytes], _t_str_bytes]
|
||||
|
||||
|
||||
def is_text_serializer(serializer: _t.Any) -> bool:
|
||||
"""Checks whether a serializer generates text or binary."""
|
||||
return isinstance(serializer.dumps({}), str)
|
||||
|
||||
|
||||
class Serializer:
|
||||
"""A serializer wraps a :class:`~itsdangerous.signer.Signer` to
|
||||
enable serializing and securely signing data other than bytes. It
|
||||
can unsign to verify that the data hasn't been changed.
|
||||
|
||||
The serializer provides :meth:`dumps` and :meth:`loads`, similar to
|
||||
:mod:`json`, and by default uses :mod:`json` internally to serialize
|
||||
the data to bytes.
|
||||
|
||||
The secret key should be a random string of ``bytes`` and should not
|
||||
be saved to code or version control. Different salts should be used
|
||||
to distinguish signing in different contexts. See :doc:`/concepts`
|
||||
for information about the security of the secret key and salt.
|
||||
|
||||
:param secret_key: The secret key to sign and verify with. Can be a
|
||||
list of keys, oldest to newest, to support key rotation.
|
||||
:param salt: Extra key to combine with ``secret_key`` to distinguish
|
||||
signatures in different contexts.
|
||||
:param serializer: An object that provides ``dumps`` and ``loads``
|
||||
methods for serializing data to a string. Defaults to
|
||||
:attr:`default_serializer`, which defaults to :mod:`json`.
|
||||
:param serializer_kwargs: Keyword arguments to pass when calling
|
||||
``serializer.dumps``.
|
||||
:param signer: A ``Signer`` class to instantiate when signing data.
|
||||
Defaults to :attr:`default_signer`, which defaults to
|
||||
:class:`~itsdangerous.signer.Signer`.
|
||||
:param signer_kwargs: Keyword arguments to pass when instantiating
|
||||
the ``Signer`` class.
|
||||
:param fallback_signers: List of signer parameters to try when
|
||||
unsigning with the default signer fails. Each item can be a dict
|
||||
of ``signer_kwargs``, a ``Signer`` class, or a tuple of
|
||||
``(signer, signer_kwargs)``. Defaults to
|
||||
:attr:`default_fallback_signers`.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
Added support for key rotation by passing a list to
|
||||
``secret_key``.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
Removed the default SHA-512 fallback signer from
|
||||
``default_fallback_signers``.
|
||||
|
||||
.. versionchanged:: 1.1
|
||||
Added support for ``fallback_signers`` and configured a default
|
||||
SHA-512 fallback. This fallback is for users who used the yanked
|
||||
1.0.0 release which defaulted to SHA-512.
|
||||
|
||||
.. versionchanged:: 0.14
|
||||
The ``signer`` and ``signer_kwargs`` parameters were added to
|
||||
the constructor.
|
||||
"""
|
||||
|
||||
#: The default serialization module to use to serialize data to a
|
||||
#: string internally. The default is :mod:`json`, but can be changed
|
||||
#: to any object that provides ``dumps`` and ``loads`` methods.
|
||||
default_serializer: _t.Any = json
|
||||
|
||||
#: The default ``Signer`` class to instantiate when signing data.
|
||||
#: The default is :class:`itsdangerous.signer.Signer`.
|
||||
default_signer: _t_signer = Signer
|
||||
|
||||
#: The default fallback signers to try when unsigning fails.
|
||||
default_fallback_signers: _t_fallbacks = []
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
secret_key: _t_secret_key,
|
||||
salt: _t_opt_str_bytes = b"itsdangerous",
|
||||
serializer: _t.Any = None,
|
||||
serializer_kwargs: _t_opt_kwargs = None,
|
||||
signer: _t.Optional[_t_signer] = None,
|
||||
signer_kwargs: _t_opt_kwargs = None,
|
||||
fallback_signers: _t.Optional[_t_fallbacks] = None,
|
||||
):
|
||||
#: The list of secret keys to try for verifying signatures, from
|
||||
#: oldest to newest. The newest (last) key is used for signing.
|
||||
#:
|
||||
#: This allows a key rotation system to keep a list of allowed
|
||||
#: keys and remove expired ones.
|
||||
self.secret_keys: _t.List[bytes] = _make_keys_list(secret_key)
|
||||
|
||||
if salt is not None:
|
||||
salt = want_bytes(salt)
|
||||
# if salt is None then the signer's default is used
|
||||
|
||||
self.salt = salt
|
||||
|
||||
if serializer is None:
|
||||
serializer = self.default_serializer
|
||||
|
||||
self.serializer: _t.Any = serializer
|
||||
self.is_text_serializer: bool = is_text_serializer(serializer)
|
||||
|
||||
if signer is None:
|
||||
signer = self.default_signer
|
||||
|
||||
self.signer: _t_signer = signer
|
||||
self.signer_kwargs: _t_kwargs = signer_kwargs or {}
|
||||
|
||||
if fallback_signers is None:
|
||||
fallback_signers = list(self.default_fallback_signers or ())
|
||||
|
||||
self.fallback_signers: _t_fallbacks = fallback_signers
|
||||
self.serializer_kwargs: _t_kwargs = serializer_kwargs or {}
|
||||
|
||||
@property
|
||||
def secret_key(self) -> bytes:
|
||||
"""The newest (last) entry in the :attr:`secret_keys` list. This
|
||||
is for compatibility from before key rotation support was added.
|
||||
"""
|
||||
return self.secret_keys[-1]
|
||||
|
||||
def load_payload(
|
||||
self, payload: bytes, serializer: _t.Optional[_t.Any] = None
|
||||
) -> _t.Any:
|
||||
"""Loads the encoded object. This function raises
|
||||
:class:`.BadPayload` if the payload is not valid. The
|
||||
``serializer`` parameter can be used to override the serializer
|
||||
stored on the class. The encoded ``payload`` should always be
|
||||
bytes.
|
||||
"""
|
||||
if serializer is None:
|
||||
serializer = self.serializer
|
||||
is_text = self.is_text_serializer
|
||||
else:
|
||||
is_text = is_text_serializer(serializer)
|
||||
|
||||
try:
|
||||
if is_text:
|
||||
return serializer.loads(payload.decode("utf-8"))
|
||||
|
||||
return serializer.loads(payload)
|
||||
except Exception as e:
|
||||
raise BadPayload(
|
||||
"Could not load the payload because an exception"
|
||||
" occurred on unserializing the data.",
|
||||
original_error=e,
|
||||
)
|
||||
|
||||
def dump_payload(self, obj: _t.Any) -> bytes:
|
||||
"""Dumps the encoded object. The return value is always bytes.
|
||||
If the internal serializer returns text, the value will be
|
||||
encoded as UTF-8.
|
||||
"""
|
||||
return want_bytes(self.serializer.dumps(obj, **self.serializer_kwargs))
|
||||
|
||||
def make_signer(self, salt: _t_opt_str_bytes = None) -> Signer:
|
||||
"""Creates a new instance of the signer to be used. The default
|
||||
implementation uses the :class:`.Signer` base class.
|
||||
"""
|
||||
if salt is None:
|
||||
salt = self.salt
|
||||
|
||||
return self.signer(self.secret_keys, salt=salt, **self.signer_kwargs)
|
||||
|
||||
def iter_unsigners(self, salt: _t_opt_str_bytes = None) -> _t.Iterator[Signer]:
|
||||
"""Iterates over all signers to be tried for unsigning. Starts
|
||||
with the configured signer, then constructs each signer
|
||||
specified in ``fallback_signers``.
|
||||
"""
|
||||
if salt is None:
|
||||
salt = self.salt
|
||||
|
||||
yield self.make_signer(salt)
|
||||
|
||||
for fallback in self.fallback_signers:
|
||||
if isinstance(fallback, dict):
|
||||
kwargs = fallback
|
||||
fallback = self.signer
|
||||
elif isinstance(fallback, tuple):
|
||||
fallback, kwargs = fallback
|
||||
else:
|
||||
kwargs = self.signer_kwargs
|
||||
|
||||
for secret_key in self.secret_keys:
|
||||
yield fallback(secret_key, salt=salt, **kwargs)
|
||||
|
||||
def dumps(self, obj: _t.Any, salt: _t_opt_str_bytes = None) -> _t_str_bytes:
|
||||
"""Returns a signed string serialized with the internal
|
||||
serializer. The return value can be either a byte or unicode
|
||||
string depending on the format of the internal serializer.
|
||||
"""
|
||||
payload = want_bytes(self.dump_payload(obj))
|
||||
rv = self.make_signer(salt).sign(payload)
|
||||
|
||||
if self.is_text_serializer:
|
||||
return rv.decode("utf-8")
|
||||
|
||||
return rv
|
||||
|
||||
def dump(self, obj: _t.Any, f: _t.IO, salt: _t_opt_str_bytes = None) -> None:
|
||||
"""Like :meth:`dumps` but dumps into a file. The file handle has
|
||||
to be compatible with what the internal serializer expects.
|
||||
"""
|
||||
f.write(self.dumps(obj, salt))
|
||||
|
||||
def loads(
|
||||
self, s: _t_str_bytes, salt: _t_opt_str_bytes = None, **kwargs: _t.Any
|
||||
) -> _t.Any:
|
||||
"""Reverse of :meth:`dumps`. Raises :exc:`.BadSignature` if the
|
||||
signature validation fails.
|
||||
"""
|
||||
s = want_bytes(s)
|
||||
last_exception = None
|
||||
|
||||
for signer in self.iter_unsigners(salt):
|
||||
try:
|
||||
return self.load_payload(signer.unsign(s))
|
||||
except BadSignature as err:
|
||||
last_exception = err
|
||||
|
||||
raise _t.cast(BadSignature, last_exception)
|
||||
|
||||
def load(self, f: _t.IO, salt: _t_opt_str_bytes = None) -> _t.Any:
|
||||
"""Like :meth:`loads` but loads from a file."""
|
||||
return self.loads(f.read(), salt)
|
||||
|
||||
def loads_unsafe(
|
||||
self, s: _t_str_bytes, salt: _t_opt_str_bytes = None
|
||||
) -> _t_load_unsafe:
|
||||
"""Like :meth:`loads` but without verifying the signature. This
|
||||
is potentially very dangerous to use depending on how your
|
||||
serializer works. The return value is ``(signature_valid,
|
||||
payload)`` instead of just the payload. The first item will be a
|
||||
boolean that indicates if the signature is valid. This function
|
||||
never fails.
|
||||
|
||||
Use it for debugging only and if you know that your serializer
|
||||
module is not exploitable (for example, do not use it with a
|
||||
pickle serializer).
|
||||
|
||||
.. versionadded:: 0.15
|
||||
"""
|
||||
return self._loads_unsafe_impl(s, salt)
|
||||
|
||||
def _loads_unsafe_impl(
|
||||
self,
|
||||
s: _t_str_bytes,
|
||||
salt: _t_opt_str_bytes,
|
||||
load_kwargs: _t_opt_kwargs = None,
|
||||
load_payload_kwargs: _t_opt_kwargs = None,
|
||||
) -> _t_load_unsafe:
|
||||
"""Low level helper function to implement :meth:`loads_unsafe`
|
||||
in serializer subclasses.
|
||||
"""
|
||||
if load_kwargs is None:
|
||||
load_kwargs = {}
|
||||
|
||||
try:
|
||||
return True, self.loads(s, salt=salt, **load_kwargs)
|
||||
except BadSignature as e:
|
||||
if e.payload is None:
|
||||
return False, None
|
||||
|
||||
if load_payload_kwargs is None:
|
||||
load_payload_kwargs = {}
|
||||
|
||||
try:
|
||||
return (
|
||||
False,
|
||||
self.load_payload(e.payload, **load_payload_kwargs),
|
||||
)
|
||||
except BadPayload:
|
||||
return False, None
|
||||
|
||||
def load_unsafe(self, f: _t.IO, salt: _t_opt_str_bytes = None) -> _t_load_unsafe:
|
||||
"""Like :meth:`loads_unsafe` but loads from a file.
|
||||
|
||||
.. versionadded:: 0.15
|
||||
"""
|
||||
return self.loads_unsafe(f.read(), salt=salt)
|
||||
|
|
@ -0,0 +1,257 @@
|
|||
import hashlib
|
||||
import hmac
|
||||
import typing as _t
|
||||
|
||||
from .encoding import _base64_alphabet
|
||||
from .encoding import base64_decode
|
||||
from .encoding import base64_encode
|
||||
from .encoding import want_bytes
|
||||
from .exc import BadSignature
|
||||
|
||||
_t_str_bytes = _t.Union[str, bytes]
|
||||
_t_opt_str_bytes = _t.Optional[_t_str_bytes]
|
||||
_t_secret_key = _t.Union[_t.Iterable[_t_str_bytes], _t_str_bytes]
|
||||
|
||||
|
||||
class SigningAlgorithm:
|
||||
"""Subclasses must implement :meth:`get_signature` to provide
|
||||
signature generation functionality.
|
||||
"""
|
||||
|
||||
def get_signature(self, key: bytes, value: bytes) -> bytes:
|
||||
"""Returns the signature for the given key and value."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def verify_signature(self, key: bytes, value: bytes, sig: bytes) -> bool:
|
||||
"""Verifies the given signature matches the expected
|
||||
signature.
|
||||
"""
|
||||
return hmac.compare_digest(sig, self.get_signature(key, value))
|
||||
|
||||
|
||||
class NoneAlgorithm(SigningAlgorithm):
|
||||
"""Provides an algorithm that does not perform any signing and
|
||||
returns an empty signature.
|
||||
"""
|
||||
|
||||
def get_signature(self, key: bytes, value: bytes) -> bytes:
|
||||
return b""
|
||||
|
||||
|
||||
class HMACAlgorithm(SigningAlgorithm):
|
||||
"""Provides signature generation using HMACs."""
|
||||
|
||||
#: The digest method to use with the MAC algorithm. This defaults to
|
||||
#: SHA1, but can be changed to any other function in the hashlib
|
||||
#: module.
|
||||
default_digest_method: _t.Any = staticmethod(hashlib.sha1)
|
||||
|
||||
def __init__(self, digest_method: _t.Any = None):
|
||||
if digest_method is None:
|
||||
digest_method = self.default_digest_method
|
||||
|
||||
self.digest_method: _t.Any = digest_method
|
||||
|
||||
def get_signature(self, key: bytes, value: bytes) -> bytes:
|
||||
mac = hmac.new(key, msg=value, digestmod=self.digest_method)
|
||||
return mac.digest()
|
||||
|
||||
|
||||
def _make_keys_list(secret_key: _t_secret_key) -> _t.List[bytes]:
|
||||
if isinstance(secret_key, (str, bytes)):
|
||||
return [want_bytes(secret_key)]
|
||||
|
||||
return [want_bytes(s) for s in secret_key]
|
||||
|
||||
|
||||
class Signer:
|
||||
"""A signer securely signs bytes, then unsigns them to verify that
|
||||
the value hasn't been changed.
|
||||
|
||||
The secret key should be a random string of ``bytes`` and should not
|
||||
be saved to code or version control. Different salts should be used
|
||||
to distinguish signing in different contexts. See :doc:`/concepts`
|
||||
for information about the security of the secret key and salt.
|
||||
|
||||
:param secret_key: The secret key to sign and verify with. Can be a
|
||||
list of keys, oldest to newest, to support key rotation.
|
||||
:param salt: Extra key to combine with ``secret_key`` to distinguish
|
||||
signatures in different contexts.
|
||||
:param sep: Separator between the signature and value.
|
||||
:param key_derivation: How to derive the signing key from the secret
|
||||
key and salt. Possible values are ``concat``, ``django-concat``,
|
||||
or ``hmac``. Defaults to :attr:`default_key_derivation`, which
|
||||
defaults to ``django-concat``.
|
||||
:param digest_method: Hash function to use when generating the HMAC
|
||||
signature. Defaults to :attr:`default_digest_method`, which
|
||||
defaults to :func:`hashlib.sha1`. Note that the security of the
|
||||
hash alone doesn't apply when used intermediately in HMAC.
|
||||
:param algorithm: A :class:`SigningAlgorithm` instance to use
|
||||
instead of building a default :class:`HMACAlgorithm` with the
|
||||
``digest_method``.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
Added support for key rotation by passing a list to
|
||||
``secret_key``.
|
||||
|
||||
.. versionchanged:: 0.18
|
||||
``algorithm`` was added as an argument to the class constructor.
|
||||
|
||||
.. versionchanged:: 0.14
|
||||
``key_derivation`` and ``digest_method`` were added as arguments
|
||||
to the class constructor.
|
||||
"""
|
||||
|
||||
#: The default digest method to use for the signer. The default is
|
||||
#: :func:`hashlib.sha1`, but can be changed to any :mod:`hashlib` or
|
||||
#: compatible object. Note that the security of the hash alone
|
||||
#: doesn't apply when used intermediately in HMAC.
|
||||
#:
|
||||
#: .. versionadded:: 0.14
|
||||
default_digest_method: _t.Any = staticmethod(hashlib.sha1)
|
||||
|
||||
#: The default scheme to use to derive the signing key from the
|
||||
#: secret key and salt. The default is ``django-concat``. Possible
|
||||
#: values are ``concat``, ``django-concat``, and ``hmac``.
|
||||
#:
|
||||
#: .. versionadded:: 0.14
|
||||
default_key_derivation: str = "django-concat"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
secret_key: _t_secret_key,
|
||||
salt: _t_opt_str_bytes = b"itsdangerous.Signer",
|
||||
sep: _t_str_bytes = b".",
|
||||
key_derivation: _t.Optional[str] = None,
|
||||
digest_method: _t.Optional[_t.Any] = None,
|
||||
algorithm: _t.Optional[SigningAlgorithm] = None,
|
||||
):
|
||||
#: The list of secret keys to try for verifying signatures, from
|
||||
#: oldest to newest. The newest (last) key is used for signing.
|
||||
#:
|
||||
#: This allows a key rotation system to keep a list of allowed
|
||||
#: keys and remove expired ones.
|
||||
self.secret_keys: _t.List[bytes] = _make_keys_list(secret_key)
|
||||
self.sep: bytes = want_bytes(sep)
|
||||
|
||||
if self.sep in _base64_alphabet:
|
||||
raise ValueError(
|
||||
"The given separator cannot be used because it may be"
|
||||
" contained in the signature itself. ASCII letters,"
|
||||
" digits, and '-_=' must not be used."
|
||||
)
|
||||
|
||||
if salt is not None:
|
||||
salt = want_bytes(salt)
|
||||
else:
|
||||
salt = b"itsdangerous.Signer"
|
||||
|
||||
self.salt = salt
|
||||
|
||||
if key_derivation is None:
|
||||
key_derivation = self.default_key_derivation
|
||||
|
||||
self.key_derivation: str = key_derivation
|
||||
|
||||
if digest_method is None:
|
||||
digest_method = self.default_digest_method
|
||||
|
||||
self.digest_method: _t.Any = digest_method
|
||||
|
||||
if algorithm is None:
|
||||
algorithm = HMACAlgorithm(self.digest_method)
|
||||
|
||||
self.algorithm: SigningAlgorithm = algorithm
|
||||
|
||||
@property
|
||||
def secret_key(self) -> bytes:
|
||||
"""The newest (last) entry in the :attr:`secret_keys` list. This
|
||||
is for compatibility from before key rotation support was added.
|
||||
"""
|
||||
return self.secret_keys[-1]
|
||||
|
||||
def derive_key(self, secret_key: _t_opt_str_bytes = None) -> bytes:
|
||||
"""This method is called to derive the key. The default key
|
||||
derivation choices can be overridden here. Key derivation is not
|
||||
intended to be used as a security method to make a complex key
|
||||
out of a short password. Instead you should use large random
|
||||
secret keys.
|
||||
|
||||
:param secret_key: A specific secret key to derive from.
|
||||
Defaults to the last item in :attr:`secret_keys`.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
Added the ``secret_key`` parameter.
|
||||
"""
|
||||
if secret_key is None:
|
||||
secret_key = self.secret_keys[-1]
|
||||
else:
|
||||
secret_key = want_bytes(secret_key)
|
||||
|
||||
if self.key_derivation == "concat":
|
||||
return _t.cast(bytes, self.digest_method(self.salt + secret_key).digest())
|
||||
elif self.key_derivation == "django-concat":
|
||||
return _t.cast(
|
||||
bytes, self.digest_method(self.salt + b"signer" + secret_key).digest()
|
||||
)
|
||||
elif self.key_derivation == "hmac":
|
||||
mac = hmac.new(secret_key, digestmod=self.digest_method)
|
||||
mac.update(self.salt)
|
||||
return mac.digest()
|
||||
elif self.key_derivation == "none":
|
||||
return secret_key
|
||||
else:
|
||||
raise TypeError("Unknown key derivation method")
|
||||
|
||||
def get_signature(self, value: _t_str_bytes) -> bytes:
|
||||
"""Returns the signature for the given value."""
|
||||
value = want_bytes(value)
|
||||
key = self.derive_key()
|
||||
sig = self.algorithm.get_signature(key, value)
|
||||
return base64_encode(sig)
|
||||
|
||||
def sign(self, value: _t_str_bytes) -> bytes:
|
||||
"""Signs the given string."""
|
||||
value = want_bytes(value)
|
||||
return value + self.sep + self.get_signature(value)
|
||||
|
||||
def verify_signature(self, value: _t_str_bytes, sig: _t_str_bytes) -> bool:
|
||||
"""Verifies the signature for the given value."""
|
||||
try:
|
||||
sig = base64_decode(sig)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
value = want_bytes(value)
|
||||
|
||||
for secret_key in reversed(self.secret_keys):
|
||||
key = self.derive_key(secret_key)
|
||||
|
||||
if self.algorithm.verify_signature(key, value, sig):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def unsign(self, signed_value: _t_str_bytes) -> bytes:
|
||||
"""Unsigns the given string."""
|
||||
signed_value = want_bytes(signed_value)
|
||||
|
||||
if self.sep not in signed_value:
|
||||
raise BadSignature(f"No {self.sep!r} found in value")
|
||||
|
||||
value, sig = signed_value.rsplit(self.sep, 1)
|
||||
|
||||
if self.verify_signature(value, sig):
|
||||
return value
|
||||
|
||||
raise BadSignature(f"Signature {sig!r} does not match", payload=value)
|
||||
|
||||
def validate(self, signed_value: _t_str_bytes) -> bool:
|
||||
"""Only validates the given signed value. Returns ``True`` if
|
||||
the signature exists and is valid.
|
||||
"""
|
||||
try:
|
||||
self.unsign(signed_value)
|
||||
return True
|
||||
except BadSignature:
|
||||
return False
|
||||
|
|
@ -0,0 +1,227 @@
|
|||
import time
|
||||
import typing
|
||||
import typing as _t
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
|
||||
from .encoding import base64_decode
|
||||
from .encoding import base64_encode
|
||||
from .encoding import bytes_to_int
|
||||
from .encoding import int_to_bytes
|
||||
from .encoding import want_bytes
|
||||
from .exc import BadSignature
|
||||
from .exc import BadTimeSignature
|
||||
from .exc import SignatureExpired
|
||||
from .serializer import Serializer
|
||||
from .signer import Signer
|
||||
|
||||
_t_str_bytes = _t.Union[str, bytes]
|
||||
_t_opt_str_bytes = _t.Optional[_t_str_bytes]
|
||||
_t_opt_int = _t.Optional[int]
|
||||
|
||||
if _t.TYPE_CHECKING:
|
||||
import typing_extensions as _te
|
||||
|
||||
|
||||
class TimestampSigner(Signer):
|
||||
"""Works like the regular :class:`.Signer` but also records the time
|
||||
of the signing and can be used to expire signatures. The
|
||||
:meth:`unsign` method can raise :exc:`.SignatureExpired` if the
|
||||
unsigning failed because the signature is expired.
|
||||
"""
|
||||
|
||||
def get_timestamp(self) -> int:
|
||||
"""Returns the current timestamp. The function must return an
|
||||
integer.
|
||||
"""
|
||||
return int(time.time())
|
||||
|
||||
def timestamp_to_datetime(self, ts: int) -> datetime:
|
||||
"""Convert the timestamp from :meth:`get_timestamp` into an
|
||||
aware :class`datetime.datetime` in UTC.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
The timestamp is returned as a timezone-aware ``datetime``
|
||||
in UTC rather than a naive ``datetime`` assumed to be UTC.
|
||||
"""
|
||||
return datetime.fromtimestamp(ts, tz=timezone.utc)
|
||||
|
||||
def sign(self, value: _t_str_bytes) -> bytes:
|
||||
"""Signs the given string and also attaches time information."""
|
||||
value = want_bytes(value)
|
||||
timestamp = base64_encode(int_to_bytes(self.get_timestamp()))
|
||||
sep = want_bytes(self.sep)
|
||||
value = value + sep + timestamp
|
||||
return value + sep + self.get_signature(value)
|
||||
|
||||
# Ignore overlapping signatures check, return_timestamp is the only
|
||||
# parameter that affects the return type.
|
||||
|
||||
@typing.overload
|
||||
def unsign( # type: ignore
|
||||
self,
|
||||
signed_value: _t_str_bytes,
|
||||
max_age: _t_opt_int = None,
|
||||
return_timestamp: "_te.Literal[False]" = False,
|
||||
) -> bytes:
|
||||
...
|
||||
|
||||
@typing.overload
|
||||
def unsign(
|
||||
self,
|
||||
signed_value: _t_str_bytes,
|
||||
max_age: _t_opt_int = None,
|
||||
return_timestamp: "_te.Literal[True]" = True,
|
||||
) -> _t.Tuple[bytes, datetime]:
|
||||
...
|
||||
|
||||
def unsign(
|
||||
self,
|
||||
signed_value: _t_str_bytes,
|
||||
max_age: _t_opt_int = None,
|
||||
return_timestamp: bool = False,
|
||||
) -> _t.Union[_t.Tuple[bytes, datetime], bytes]:
|
||||
"""Works like the regular :meth:`.Signer.unsign` but can also
|
||||
validate the time. See the base docstring of the class for
|
||||
the general behavior. If ``return_timestamp`` is ``True`` the
|
||||
timestamp of the signature will be returned as an aware
|
||||
:class:`datetime.datetime` object in UTC.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
The timestamp is returned as a timezone-aware ``datetime``
|
||||
in UTC rather than a naive ``datetime`` assumed to be UTC.
|
||||
"""
|
||||
try:
|
||||
result = super().unsign(signed_value)
|
||||
sig_error = None
|
||||
except BadSignature as e:
|
||||
sig_error = e
|
||||
result = e.payload or b""
|
||||
|
||||
sep = want_bytes(self.sep)
|
||||
|
||||
# If there is no timestamp in the result there is something
|
||||
# seriously wrong. In case there was a signature error, we raise
|
||||
# that one directly, otherwise we have a weird situation in
|
||||
# which we shouldn't have come except someone uses a time-based
|
||||
# serializer on non-timestamp data, so catch that.
|
||||
if sep not in result:
|
||||
if sig_error:
|
||||
raise sig_error
|
||||
|
||||
raise BadTimeSignature("timestamp missing", payload=result)
|
||||
|
||||
value, ts_bytes = result.rsplit(sep, 1)
|
||||
ts_int: _t_opt_int = None
|
||||
ts_dt: _t.Optional[datetime] = None
|
||||
|
||||
try:
|
||||
ts_int = bytes_to_int(base64_decode(ts_bytes))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Signature is *not* okay. Raise a proper error now that we have
|
||||
# split the value and the timestamp.
|
||||
if sig_error is not None:
|
||||
if ts_int is not None:
|
||||
ts_dt = self.timestamp_to_datetime(ts_int)
|
||||
|
||||
raise BadTimeSignature(str(sig_error), payload=value, date_signed=ts_dt)
|
||||
|
||||
# Signature was okay but the timestamp is actually not there or
|
||||
# malformed. Should not happen, but we handle it anyway.
|
||||
if ts_int is None:
|
||||
raise BadTimeSignature("Malformed timestamp", payload=value)
|
||||
|
||||
# Check timestamp is not older than max_age
|
||||
if max_age is not None:
|
||||
age = self.get_timestamp() - ts_int
|
||||
|
||||
if age > max_age:
|
||||
raise SignatureExpired(
|
||||
f"Signature age {age} > {max_age} seconds",
|
||||
payload=value,
|
||||
date_signed=self.timestamp_to_datetime(ts_int),
|
||||
)
|
||||
|
||||
if age < 0:
|
||||
raise SignatureExpired(
|
||||
f"Signature age {age} < 0 seconds",
|
||||
payload=value,
|
||||
date_signed=self.timestamp_to_datetime(ts_int),
|
||||
)
|
||||
|
||||
if return_timestamp:
|
||||
return value, self.timestamp_to_datetime(ts_int)
|
||||
|
||||
return value
|
||||
|
||||
def validate(self, signed_value: _t_str_bytes, max_age: _t_opt_int = None) -> bool:
|
||||
"""Only validates the given signed value. Returns ``True`` if
|
||||
the signature exists and is valid."""
|
||||
try:
|
||||
self.unsign(signed_value, max_age=max_age)
|
||||
return True
|
||||
except BadSignature:
|
||||
return False
|
||||
|
||||
|
||||
class TimedSerializer(Serializer):
|
||||
"""Uses :class:`TimestampSigner` instead of the default
|
||||
:class:`.Signer`.
|
||||
"""
|
||||
|
||||
default_signer: _t.Type[TimestampSigner] = TimestampSigner
|
||||
|
||||
def iter_unsigners(
|
||||
self, salt: _t_opt_str_bytes = None
|
||||
) -> _t.Iterator[TimestampSigner]:
|
||||
return _t.cast("_t.Iterator[TimestampSigner]", super().iter_unsigners(salt))
|
||||
|
||||
# TODO: Signature is incompatible because parameters were added
|
||||
# before salt.
|
||||
|
||||
def loads( # type: ignore
|
||||
self,
|
||||
s: _t_str_bytes,
|
||||
max_age: _t_opt_int = None,
|
||||
return_timestamp: bool = False,
|
||||
salt: _t_opt_str_bytes = None,
|
||||
) -> _t.Any:
|
||||
"""Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the
|
||||
signature validation fails. If a ``max_age`` is provided it will
|
||||
ensure the signature is not older than that time in seconds. In
|
||||
case the signature is outdated, :exc:`.SignatureExpired` is
|
||||
raised. All arguments are forwarded to the signer's
|
||||
:meth:`~TimestampSigner.unsign` method.
|
||||
"""
|
||||
s = want_bytes(s)
|
||||
last_exception = None
|
||||
|
||||
for signer in self.iter_unsigners(salt):
|
||||
try:
|
||||
base64d, timestamp = signer.unsign(
|
||||
s, max_age=max_age, return_timestamp=True
|
||||
)
|
||||
payload = self.load_payload(base64d)
|
||||
|
||||
if return_timestamp:
|
||||
return payload, timestamp
|
||||
|
||||
return payload
|
||||
except SignatureExpired:
|
||||
# The signature was unsigned successfully but was
|
||||
# expired. Do not try the next signer.
|
||||
raise
|
||||
except BadSignature as err:
|
||||
last_exception = err
|
||||
|
||||
raise _t.cast(BadSignature, last_exception)
|
||||
|
||||
def loads_unsafe( # type: ignore
|
||||
self,
|
||||
s: _t_str_bytes,
|
||||
max_age: _t_opt_int = None,
|
||||
salt: _t_opt_str_bytes = None,
|
||||
) -> _t.Tuple[bool, _t.Any]:
|
||||
return self._loads_unsafe_impl(s, salt, load_kwargs={"max_age": max_age})
|
||||
|
|
@ -0,0 +1,80 @@
|
|||
import typing as _t
|
||||
import zlib
|
||||
|
||||
from ._json import _CompactJSON
|
||||
from .encoding import base64_decode
|
||||
from .encoding import base64_encode
|
||||
from .exc import BadPayload
|
||||
from .serializer import Serializer
|
||||
from .timed import TimedSerializer
|
||||
|
||||
|
||||
class URLSafeSerializerMixin(Serializer):
|
||||
"""Mixed in with a regular serializer it will attempt to zlib
|
||||
compress the string to make it shorter if necessary. It will also
|
||||
base64 encode the string so that it can safely be placed in a URL.
|
||||
"""
|
||||
|
||||
default_serializer = _CompactJSON
|
||||
|
||||
def load_payload(
|
||||
self,
|
||||
payload: bytes,
|
||||
*args: _t.Any,
|
||||
serializer: _t.Optional[_t.Any] = None,
|
||||
**kwargs: _t.Any,
|
||||
) -> _t.Any:
|
||||
decompress = False
|
||||
|
||||
if payload.startswith(b"."):
|
||||
payload = payload[1:]
|
||||
decompress = True
|
||||
|
||||
try:
|
||||
json = base64_decode(payload)
|
||||
except Exception as e:
|
||||
raise BadPayload(
|
||||
"Could not base64 decode the payload because of an exception",
|
||||
original_error=e,
|
||||
)
|
||||
|
||||
if decompress:
|
||||
try:
|
||||
json = zlib.decompress(json)
|
||||
except Exception as e:
|
||||
raise BadPayload(
|
||||
"Could not zlib decompress the payload before decoding the payload",
|
||||
original_error=e,
|
||||
)
|
||||
|
||||
return super().load_payload(json, *args, **kwargs)
|
||||
|
||||
def dump_payload(self, obj: _t.Any) -> bytes:
|
||||
json = super().dump_payload(obj)
|
||||
is_compressed = False
|
||||
compressed = zlib.compress(json)
|
||||
|
||||
if len(compressed) < (len(json) - 1):
|
||||
json = compressed
|
||||
is_compressed = True
|
||||
|
||||
base64d = base64_encode(json)
|
||||
|
||||
if is_compressed:
|
||||
base64d = b"." + base64d
|
||||
|
||||
return base64d
|
||||
|
||||
|
||||
class URLSafeSerializer(URLSafeSerializerMixin, Serializer):
|
||||
"""Works like :class:`.Serializer` but dumps and loads into a URL
|
||||
safe string consisting of the upper and lowercase character of the
|
||||
alphabet as well as ``'_'``, ``'-'`` and ``'.'``.
|
||||
"""
|
||||
|
||||
|
||||
class URLSafeTimedSerializer(URLSafeSerializerMixin, TimedSerializer):
|
||||
"""Works like :class:`.TimedSerializer` but dumps and loads into a
|
||||
URL safe string consisting of the upper and lowercase character of
|
||||
the alphabet as well as ``'_'``, ``'-'`` and ``'.'``.
|
||||
"""
|
||||
Loading…
Add table
Add a link
Reference in a new issue