Source code for paradrop.backend.auth

import base64
import functools
import json

from klein import Klein

from paradrop.base import nexus, settings
from paradrop.base.output import out
from paradrop.core.chute.chute_storage import ChuteStorage
from . import cors

# TODO: Configurable username/password.
# TODO: Segment permissions based on roles.
#       Admin can do anything, but maybe a guest can still view device status.
#       Chutes should not be able to manipulate other chutes.

[docs]def get_username_password(userpass): """ Please note: username and password can either be presented in plain text such as "admin:password" or base64 encoded such as "YWRtaW46cGFzc3dvcmQ=". Both forms should be returned from this function. """ plaintext = base64.b64decode(userpass) segments = plaintext.split(':') count = len(segments) if count == 1: return segments[0], '' elif count == 2: return segments else: raise Execption('There should be at most two segments!')
[docs]def verify_password(password_manager, userpass): user_name, password = get_username_password(userpass) return password_manager.verify_password(user_name, password)
[docs]def get_allowed_bearer(): """ Return set of allowed bearer tokens. """ allowed = set() chuteStore = ChuteStorage() chutes = chuteStore.getChuteList() for chute in chutes: token = chute.getCache('apiToken') if token is not None: allowed.add(token) return allowed
[docs]def check_auth(password_manager, token_manager, auth_header): parts = auth_header.split() if parts[0] == "Basic": userpass = parts[1] return verify_password(password_manager, userpass) elif parts[0] == "Bearer": token = parts[1] # Chutes use non-expiring random tokens that are generated at container # creation. This works well because we do not want to deal with # expiration or revocation. allowed_tokens = get_allowed_bearer() if token in allowed_tokens: return True # Users (through the local portal or pdtools) can acquire expiring # JWTs. If the JWT decodes using our secret, then the caller is # authenticated. # # Later on, we may implement tokens issued by but # allowing user access to the router, etc. In that case we will # need to examine the subject, issuer, and audience claims. try: decoded = token_manager.decode(token) return True except token_manager.InvalidTokenError as error: pass return False
[docs]def requires_auth(func): """ Use as a decorator for API functions to require authorization. This checks the Authorization HTTP header. It handles username and password as well as bearer tokens. """ @functools.wraps(func) def decorated(self, request, *args, **kwargs): auth_header = request.getHeader('Authorization') if not auth_header or not check_auth(self.password_manager, self.token_manager, auth_header): out.warn('Failed to authenticate') request.setResponseCode(401) request.setHeader("WWW-Authenticate", "Basic realm=\"Login Required\"") return return func(self, request, *args, **kwargs) return decorated
[docs]class AuthApi(object): routes = Klein() def __init__(self, password_manager, token_manager): self.password_manager = password_manager self.token_manager = token_manager
[docs] @routes.route('/local', methods=['POST']) def local_login(self, request): """ Login using local authentication (username+password). """ cors.config_cors(request) request.setHeader('Content-Type', 'application/json') body = json.loads( username = body.get('username', self.password_manager.DEFAULT_USER_NAME) password = body.get('password', self.password_manager.DEFAULT_PASSWORD) success = self.password_manager.verify_password(username, password) result = { 'username': username, 'success': success } if success: token = self.token_manager.issue(username) result['token'] = token else: request.setResponseCode(401) return json.dumps(result)