Source code for graphql_jwt.utils

from calendar import timegm
from datetime import datetime

import django
from django.contrib.auth import get_user_model
from django.utils.translation import gettext as _

import jwt

from . import exceptions
from .settings import jwt_settings


[docs]def jwt_payload(user, context=None): username = user.get_username() if hasattr(username, "pk"): username = username.pk exp = datetime.utcnow() + jwt_settings.JWT_EXPIRATION_DELTA payload = { user.USERNAME_FIELD: username, "exp": timegm(exp.utctimetuple()), } if jwt_settings.JWT_ALLOW_REFRESH: payload["origIat"] = timegm(datetime.utcnow().utctimetuple()) if jwt_settings.JWT_AUDIENCE is not None: payload["aud"] = jwt_settings.JWT_AUDIENCE if jwt_settings.JWT_ISSUER is not None: payload["iss"] = jwt_settings.JWT_ISSUER return payload
[docs]def jwt_encode(payload, context=None): return jwt.encode( payload, jwt_settings.JWT_PRIVATE_KEY or jwt_settings.JWT_SECRET_KEY, jwt_settings.JWT_ALGORITHM, )
[docs]def jwt_decode(token, context=None): return jwt.decode( token, jwt_settings.JWT_PUBLIC_KEY or jwt_settings.JWT_SECRET_KEY, options={ "verify_exp": jwt_settings.JWT_VERIFY_EXPIRATION, "verify_aud": jwt_settings.JWT_AUDIENCE is not None, "verify_signature": jwt_settings.JWT_VERIFY, }, leeway=jwt_settings.JWT_LEEWAY, audience=jwt_settings.JWT_AUDIENCE, issuer=jwt_settings.JWT_ISSUER, algorithms=[jwt_settings.JWT_ALGORITHM], )
def get_http_authorization(request): auth = request.META.get(jwt_settings.JWT_AUTH_HEADER_NAME, "").split() prefix = jwt_settings.JWT_AUTH_HEADER_PREFIX if len(auth) != 2 or auth[0].lower() != prefix.lower(): return request.COOKIES.get(jwt_settings.JWT_COOKIE_NAME) return auth[1] def get_token_argument(request, **kwargs): if jwt_settings.JWT_ALLOW_ARGUMENT: input_fields = kwargs.get("input") if isinstance(input_fields, dict): kwargs = input_fields return kwargs.get(jwt_settings.JWT_ARGUMENT_NAME) return None def get_credentials(request, **kwargs): return get_token_argument(request, **kwargs) or get_http_authorization(request) def get_payload(token, context=None): try: payload = jwt_settings.JWT_DECODE_HANDLER(token, context) except jwt.ExpiredSignatureError: raise exceptions.JSONWebTokenExpired() except jwt.DecodeError: raise exceptions.JSONWebTokenError(_("Error decoding signature")) except jwt.InvalidTokenError: raise exceptions.JSONWebTokenError(_("Invalid token")) return payload
[docs]def get_user_by_natural_key(username): UserModel = get_user_model() try: return UserModel._default_manager.get_by_natural_key(username) except UserModel.DoesNotExist: return None
def get_user_by_payload(payload): username = jwt_settings.JWT_PAYLOAD_GET_USERNAME_HANDLER(payload) if not username: raise exceptions.JSONWebTokenError(_("Invalid payload")) user = jwt_settings.JWT_GET_USER_BY_NATURAL_KEY_HANDLER(username) if user is not None and not getattr(user, "is_active", True): raise exceptions.JSONWebTokenError(_("User is disabled")) return user
[docs]def refresh_has_expired(orig_iat, context=None): exp = orig_iat + jwt_settings.JWT_REFRESH_EXPIRATION_DELTA.total_seconds() return timegm(datetime.utcnow().utctimetuple()) > exp
def set_cookie(response, key, value, expires): kwargs = { "expires": expires, "httponly": True, "secure": jwt_settings.JWT_COOKIE_SECURE, "path": jwt_settings.JWT_COOKIE_PATH, "domain": jwt_settings.JWT_COOKIE_DOMAIN, } if django.VERSION >= (2, 1): kwargs["samesite"] = jwt_settings.JWT_COOKIE_SAMESITE response.set_cookie(key, value, **kwargs) def delete_cookie(response, key): response.delete_cookie( key, path=jwt_settings.JWT_COOKIE_PATH, domain=jwt_settings.JWT_COOKIE_DOMAIN, )