JWT based authentication in Python bottle
May applications require authentication to secure protected resources. While standards like oAuth accommodate sharing resources between applications, more variance exists in implementations of securing the app in the first place. A recent standard, JWT, provides a mechanism for creating tokens with embedded data, signing these tokens and even encrypting them when warranted.
This post explores how individual resource functions can be protected using JWT. The solution involves first creating a function decorator to perform the authentication step. Each protected resource call is then decorated with the authentication function and subsequent authorization can be performed against the data in the JWT. Let’s first look at the decorator.
jwtsecret = config.authentication.get('jwtsecret') class AuthorizationError(Exception): """ A base class for exceptions used by bottle. """ pass def jwt_token_from_header(): auth = bottle.request.headers.get('Authorization', None) if not auth: raise AuthorizationError({'code': 'authorization_header_missing', 'description': 'Authorization header is expected'}) parts = auth.split() if parts[0].lower() != 'bearer': raise AuthorizationError({'code': 'invalid_header', 'description': 'Authorization header must start with Bearer'}) elif len(parts) == 1: raise AuthorizationError({'code': 'invalid_header', 'description': 'Token not found'}) elif len(parts) > 2: raise AuthorizationError({'code': 'invalid_header', 'description': 'Authorization header must be Bearer + \s + token'}) return parts[1] def requires_auth(f): """Provides JWT based authentication for any decorated function assuming credentials available in an "Authorization" header""" def decorated(*args, **kwargs): try: token = jwt_token_from_header() except AuthorizationError, reason: bottle.abort(400, reason.message) try: token_decoded = jwt.decode(token, jwtsecret) # throw away value except jwt.ExpiredSignature: bottle.abort(401, {'code': 'token_expired', 'description': 'token is expired'}) except jwt.DecodeError, message: bottle.abort(401, {'code': 'token_invalid', 'description': message.message}) return f(*args, **kwargs) return decorated |
jwtsecret = config.authentication.get('jwtsecret') class AuthorizationError(Exception): """ A base class for exceptions used by bottle. """ pass def jwt_token_from_header(): auth = bottle.request.headers.get('Authorization', None) if not auth: raise AuthorizationError({'code': 'authorization_header_missing', 'description': 'Authorization header is expected'}) parts = auth.split() if parts[0].lower() != 'bearer': raise AuthorizationError({'code': 'invalid_header', 'description': 'Authorization header must start with Bearer'}) elif len(parts) == 1: raise AuthorizationError({'code': 'invalid_header', 'description': 'Token not found'}) elif len(parts) > 2: raise AuthorizationError({'code': 'invalid_header', 'description': 'Authorization header must be Bearer + \s + token'}) return parts[1] def requires_auth(f): """Provides JWT based authentication for any decorated function assuming credentials available in an "Authorization" header""" def decorated(*args, **kwargs): try: token = jwt_token_from_header() except AuthorizationError, reason: bottle.abort(400, reason.message) try: token_decoded = jwt.decode(token, jwtsecret) # throw away value except jwt.ExpiredSignature: bottle.abort(401, {'code': 'token_expired', 'description': 'token is expired'}) except jwt.DecodeError, message: bottle.abort(401, {'code': 'token_invalid', 'description': message.message}) return f(*args, **kwargs) return decorated
In the above code the requires_auth(f)
function makes use of a helper function to verify that there is an Authorization
header and that it appears to contain the expected token. A custom exception is used to indicate a failure to identify a token in the header.
The requires_auth function then uses the python JWT library to decode the key based on a secret value jwtsecret
. The secret is obtained from a config object. Assuming the JWT decodes and is not expired, the decorated function will then be called.
Authenticate
The following function can be use to generate a new JWT.
jwtexpireoffset = config.authentication.get('jwtexpireoffset') jwtalgorithm = config.authentication.get('jwtalgorithm') def build_profile(credentials): return {'user': credentials['user'], 'role1': credentials['role1'], 'role2': credentials['role2'], 'exp': time.time()+jwtexpireoffset} bottle.post('/authenticate') def authenticate(): # extract credentials from the request credentials = bottle.request.json if not credentials or 'user' not in credentials or 'password' not in credentials: bottle.abort(400, 'Missing or bad credentials') # authenticate against some identity source, such as LDAP or a database try: # query database for username and confirm password # or send a query to LDAP or oAuth except Exception, error_message: logging.exception("Authentication failure") bottle.abort(403, 'Authentication failed for %s: %s' % (credentials['user'], error_message)) credentials['role1'] = is_authorized_role1(credentials['user']) credentials['role2'] = is_authorized_role2(credentials['user']) token = jwt.encode(build_profile(credentials), jwtsecret, algorithm=jwtalgorithm) logging.info('Authentication successful for %s' % (credentials['user'])) return {'token': token} |
jwtexpireoffset = config.authentication.get('jwtexpireoffset') jwtalgorithm = config.authentication.get('jwtalgorithm') def build_profile(credentials): return {'user': credentials['user'], 'role1': credentials['role1'], 'role2': credentials['role2'], 'exp': time.time()+jwtexpireoffset} bottle.post('/authenticate') def authenticate(): # extract credentials from the request credentials = bottle.request.json if not credentials or 'user' not in credentials or 'password' not in credentials: bottle.abort(400, 'Missing or bad credentials') # authenticate against some identity source, such as LDAP or a database try: # query database for username and confirm password # or send a query to LDAP or oAuth except Exception, error_message: logging.exception("Authentication failure") bottle.abort(403, 'Authentication failed for %s: %s' % (credentials['user'], error_message)) credentials['role1'] = is_authorized_role1(credentials['user']) credentials['role2'] = is_authorized_role2(credentials['user']) token = jwt.encode(build_profile(credentials), jwtsecret, algorithm=jwtalgorithm) logging.info('Authentication successful for %s' % (credentials['user'])) return {'token': token}
Notice that two additional values are stored in the global configuration, jwtalgorithm
and jwtexpireoffset
. These are used along with jwtsecret
to encode the JWT token. The actual verification of user credentials can happen in many ways, including direct access to a datastore, LDAP, oAuth, etc. After authenticating credentials, it’s easy to authorize a user based on roles. These could be implemented as separate functions and could confirm role based access based on LDAP group membership, database records, oAuth scopes, etc. While the role level access shown above looks binary, it could easily be more granular. Since a JWT is based on JSON, the JWT payload is represented as a JSON serializable python dictionary. Finally the token is returned.
Protected resources
At this point, any protected resource can be decorated, as shown below.
def get_jwt_credentials(): # get and decode the current token token = jwt_token_from_header() credentials = jwt.decode(token, jwtsecret) return credentials @appv1.get('/protected/resource') @requires_auth def get_protected_resource(): # get user details from JWT authenticated_user = get_jwt_credentials() # get protected resource try: return {'resource': somedao.find_protected_resource_by_username(authenticated_user['username'])} except Exception, e: logging.exception("Resource not found") bottle.abort(404, 'No resource for username %s was found.' % authenticated_user['username']) |
def get_jwt_credentials(): # get and decode the current token token = jwt_token_from_header() credentials = jwt.decode(token, jwtsecret) return credentials @appv1.get('/protected/resource') @requires_auth def get_protected_resource(): # get user details from JWT authenticated_user = get_jwt_credentials() # get protected resource try: return {'resource': somedao.find_protected_resource_by_username(authenticated_user['username'])} except Exception, e: logging.exception("Resource not found") bottle.abort(404, 'No resource for username %s was found.' % authenticated_user['username'])
The function get_protected_resource
will only be executed if requires_auth
successfully validates a JWT in the header of the request. The function get_jwt_credentials
will actually retrieve the JWT payload to be used in the function. While I don’t show an implementation of somedao
, it is simply an encapsulation point to facilitate access to resources.
Since the JWT expires (optionally, but a good idea), it’s necessary to build in some way to extend the ‘session’. For this a refresh endpoint can be provided as follows.
bottle.post('/authenticate/refresh') @requires_auth def refresh_token(): """refresh the current JWT""" # get and decode the current token token = jwt_token_from_header() payload = jwt.decode(token, jwtsecret) # create a new token with a new exp time token = jwt.encode(build_profile(payload), jwtsecret, algorithm=jwtalgorithm) return {'token': token} |
bottle.post('/authenticate/refresh') @requires_auth def refresh_token(): """refresh the current JWT""" # get and decode the current token token = jwt_token_from_header() payload = jwt.decode(token, jwtsecret) # create a new token with a new exp time token = jwt.encode(build_profile(payload), jwtsecret, algorithm=jwtalgorithm) return {'token': token}
This simply repackages the same payload with a new expiration time.
Improvements
The need to explicitly refresh the JWT increases (possibly double) the number of requests made to an API only for the purpose of extending session life. This is inefficient and can lead to awkward UI design. If possible, it would be convenient to refactor requires_auth
to perform the JWT refresh and add the new JWT to the header of the request that is about to be processed. The UI could then grab the updated JWT that is produced with each request to use for the subsequent request.
The design above will actually decode the JWT twice for any resource function that requires access to the JWT payload. If possible, it would be better to find some way to inject the JWT payload into the decorated function. Ideally this would be done in a way that functions which don’t need the JWT payload aren’t required to add it to their contract.
The authenticate function could be modified to return the JWT as a header, rather than in the body of the request. This may decrease the chances of a JWT being cached or logged. It would also simplify the UI if the authenticate and refresh functions both return the JWT in the same manner.
Extending
This same implementation could be reproduced in any language or framework. The basic steps are
- Perform (role based) authentication and authorization against some identity resource
- Generate a token (like JWT) indicating success and optionally containing information about the authenticated user
- Transmit the token and refreshed tokens in HTTP Authorization headers, both for authenticate and resource requests
Security and Risks
At the heart of JWT security is the secret used to sign the JWT. If this secret is too simple, or if it is leaked, it would be possible for a third party to craft a JWT with any desired payload, and trick an application into delivering protected resources to an attacker. It is important to choose strong secrets and to rotate them frequently. It would also be wise to perform additional validity steps. These might include tracking how many sessions a user has, where those session have originated and the nature and frequency/speed of requests. These additional measures could prevent attacks in the event that a JWT secret was discovered and may indicate a need to rotate a secret.
Microservices
In microservice environments, it is appealing to authenticate once and access multiple microservice endpoints using the same JWT. Since a JWT is stateless, each microservice only needs the JWT secret in order to validate the signature. This potentially increases the attach surface for a hacker who wants the JWT secret.