diff options
Diffstat (limited to 'django/engagementmanager/service/authorization_service.py')
-rw-r--r-- | django/engagementmanager/service/authorization_service.py | 142 |
1 files changed, 90 insertions, 52 deletions
diff --git a/django/engagementmanager/service/authorization_service.py b/django/engagementmanager/service/authorization_service.py index c850b4a..7a30d0e 100644 --- a/django/engagementmanager/service/authorization_service.py +++ b/django/engagementmanager/service/authorization_service.py @@ -1,5 +1,5 @@ -# -# ============LICENSE_START========================================== +# +# ============LICENSE_START========================================== # org.onap.vvp/engagementmgr # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -39,8 +39,8 @@ import json from enum import Enum import rest_framework -from engagementmanager.models import Role, Engagement, Checklist, NextStep, VFC, \ - VF, ChecklistDecision, Notification +from engagementmanager.models import Role, Engagement, Checklist, NextStep, \ + VFC, VF, ChecklistDecision, Notification from engagementmanager.utils.constants import Roles from engagementmanager.utils.request_data_mgr import request_data_mgr from engagementmanager.service.logging_service import LoggingServiceFactory @@ -119,8 +119,10 @@ class Permissions(Enum): class AuthorizationService: """ - The Authorization Service detemines whether a given action is authorized for a specific user. - The method: is_user_able_to performs the authorization check given a user and an action (from Permissions ENUM) + The Authorization Service detemines whether a given action is authorized + for a specific user. + The method: is_user_able_to performs the authorization check given a user + and an action (from Permissions ENUM) """ role_standard_user = None role_el = None @@ -129,8 +131,10 @@ class AuthorizationService: def __get_role_checks(self, user, action): """ - Returns the list of auth checks that should be performed per user action. - Returns None if the action is not listed in the authorized actions of the given user. + Returns the list of auth checks that should be \ + performed per user action. + Returns None if the action is not listed in the \ + authorized actions of the given user. """ result = None @@ -138,33 +142,40 @@ class AuthorizationService: if (user.role == self.role_el) and (action in self.el_permissions): result = self.el_permissions[action] # ADMIN # - elif user.role == self.role_admin and action in self.admin_permissions: + elif user.role == self.role_admin and action in \ + self.admin_permissions: result = self.admin_permissions[action] # ADMIN Read only # - elif user.role == self.role_admin_ro and action in self.admin_ro_permissions: + elif user.role == self.role_admin_ro and action in \ + self.admin_ro_permissions: result = self.admin_ro_permissions[action] # STANDRARD_USER # - if user.role == self.role_standard_user and action in self.standard_user_permissions: + if user.role == self.role_standard_user and \ + action in self.standard_user_permissions: result = self.standard_user_permissions[action] return result def __require_eng_membership(self, user, action, **kwargs): """ - Determines whether a given user is part of engagement team by the eng uuid + Determines whether a given user is part of engagement \ + team by the eng uuid. user = IceUser eng = UUID as a string :param user: user for auth check :param action: action for auth check :param kwargs: eng_uuid, checklist_uuid, ... - :return: Boolean, Message -> True/False if auth check succeeds/fails and a message describing auth failure + :return: Boolean, Message -> True/False if auth check \ + succeeds/fails and a message describing auth failure """ eng = kwargs['eng'] try: # @UndefinedVariable - if (user.email == eng.reviewer.email or user.email == eng.peer_reviewer.email or user.role.name == Roles.admin.name): + if (user.email == eng.reviewer.email or + user.email == eng.peer_reviewer.email or + user.role.name == Roles.admin.name): return True, 'OK' else: # validate if user in Team @@ -173,33 +184,39 @@ class AuthorizationService: else: return False, "" except Engagement.DoesNotExist: - msg = 'User ' + user.email + ' is not a member of engagement: ' + eng.uuid + \ - ' / User is a not peer reviewer / admin of the engagement / Engagement wasnt found while fetching from DB' + msg = 'User ' + user.email + ' is not a member of engagement: ' + \ + eng.uuid + \ + ' / User is a not peer reviewer / admin of the ' +\ + 'engagement / Engagement wasnt found while fetching from DB' logger.info(msg) return False, msg except Exception as e: print(e) - msg = 'A general error occurred while trying to validate that User ' + \ - user.email + ' is a member of engagement ' + msg = 'A general error occurred while trying to validate ' +\ + 'that User ' + user.email + ' is a member of engagement ' logger.info(msg + " Error: " + str(e)) return False, msg def __require_peer_review_ownership(self, user, action, **kwargs): """ - Determines whether the given user is the peer reviewer of the checklist + Determines whether the given user is \ + the peer reviewer of the checklist """ cl = kwargs['cl'] eng = kwargs['eng'] if cl and eng: # @UndefinedVariable - if (eng.peer_reviewer == user and cl.owner == user) or (user.role.name == Roles.admin.name): + if (eng.peer_reviewer == user and cl.owner == user) or \ + (user.role.name == Roles.admin.name): return True, 'OK' else: - return False, 'User is either not the owner of the checklist or not a peer reviewer of the checklist' + return False, 'User is either not the owner of ' +\ + 'the checklist or not a peer reviewer of the checklist' else: logger.error( - 'Internal Error - Checklist/Engagement not found while trying to check permissions for user ' + user.email) + 'Internal Error - Checklist/Engagement not found while ' + + 'trying to check permissions for user ' + user.email) return False, 'Internal Error - Checklist not found' def __require_cl_ownership(self, user, action, **kwargs): @@ -216,7 +233,8 @@ class AuthorizationService: return False, 'User is not the owner of the checklist' else: logger.error( - 'Internal Error - Checklist not found while trying to check permissions for user ' + user.email) + 'Internal Error - Checklist not found while trying to ' + + 'check permissions for user ' + user.email) return False, 'Internal Error - Checklist not found' def __require_el_of_engagement(self, user, action, **kwargs): @@ -234,7 +252,8 @@ class AuthorizationService: return False, 'Role Not authorized' else: logger.error( - 'Internal Error - Engagement not found while trying to check permissions for user ' + user.email) + 'Internal Error - Engagement not found while trying to ' + + 'check permissions for user ' + user.email) return False, 'Internal Error - Checklist not found' def __noop(self, user, action, **kwargs): @@ -266,7 +285,8 @@ class AuthorizationService: ###################### """ Each Permission Map is composed of the following key-val pairs: - Key='Action (Permission ENUM)' --> Value='Set of Checks to perform on this action.' + Key='Action (Permission ENUM)' --> Value='Set of Checks to + perform on this action.' """ el_permissions = { Permissions.add_vf: {__noop}, @@ -289,16 +309,20 @@ class AuthorizationService: Permissions.set_checklist_decision: {__require_cl_ownership}, Permissions.add_checklist_audit_log: {__require_cl_ownership}, Permissions.delete_checklist_audit_log: {__require_cl_ownership}, - Permissions.el_review_checklist: {__require_cl_ownership, __require_eng_membership}, + Permissions.el_review_checklist: {__require_cl_ownership, + __require_eng_membership}, Permissions.peer_review_checklist: {__require_peer_review_ownership}, - Permissions.handoff_checklist: {__require_cl_ownership, __require_eng_membership}, - Permissions.add_checklist_nextstep: {__require_cl_ownership, __require_eng_membership}, + Permissions.handoff_checklist: {__require_cl_ownership, + __require_eng_membership}, + Permissions.add_checklist_nextstep: {__require_cl_ownership, + __require_eng_membership}, Permissions.edit_nextstep: {__require_eng_membership}, Permissions.is_el_of_eng: {__require_el_of_engagement}, Permissions.update_personal_next_step: {__noop}, Permissions.create_checklist_audit_log: {__require_eng_membership}, Permissions.create_checklist_decision: {__require_eng_membership}, - Permissions.update_checklist_state: {__require_cl_ownership, __require_eng_membership}, + Permissions.update_checklist_state: {__require_cl_ownership, + __require_eng_membership}, Permissions.create_deployment_target_site: {__require_eng_membership}, Permissions.star_an_engagement: {__noop}, Permissions.invite: {__require_eng_membership}, @@ -338,7 +362,8 @@ class AuthorizationService: Permissions.update_vf: {__require_eng_membership}, Permissions.reset_nextstep: {__require_eng_membership}, Permissions.update_personal_next_step: {__noop}, - Permissions.update_checklist_state: {__require_cl_ownership, __require_eng_membership}, + Permissions.update_checklist_state: {__require_cl_ownership, + __require_eng_membership}, Permissions.create_deployment_target_site: {__require_eng_membership}, Permissions.star_an_engagement: {__noop}, Permissions.invite: {__require_eng_membership}, @@ -368,7 +393,8 @@ class AuthorizationService: admin_permissions.update( # Add Extra permissions to admin { Permissions.admin_approve_checklist: {__require_cl_ownership}, - Permissions.remove_from_engagement_team: {__require_eng_membership}, + Permissions.remove_from_engagement_team: { + __require_eng_membership}, Permissions.view_checklist_template: {__noop}, Permissions.edit_checklist_template: {__noop}, Permissions.archive_engagement: {__noop}, @@ -403,7 +429,8 @@ class AuthorizationService: ) def __init__(self): - self.role_standard_user = self.role_el = self.role_admin = self.role_admin_ro = None + self.role_standard_user = self.role_el = self.role_admin = \ + self.role_admin_ro = None self.__load_roles_from_db() def check_permissions(self, user, action, eng_uuid, role, eng, cl): @@ -411,7 +438,8 @@ class AuthorizationService: # role and action perm_checks = self.__get_role_checks(user, action) if not perm_checks: - # Permission Checks were not found, it means that the action is not listed in the permitted + # Permission Checks were not found, it means that the action is + # not listed in the permitted # actions for the role of the user ret = False, 'Role ' + str(role.name) + ' is not permitted to ' + \ str(action.name) + '/ Engagement: ' + \ @@ -456,7 +484,8 @@ class AuthorizationService: eng = Engagement.objects.get(uuid=eng_uuid) except Engagement.DoesNotExist: logger.error( - 'ENG was not found while checking permissions... returning 500') + 'ENG was not found while checking permissions... ' + + 'returning 500') return None, None try: @@ -490,7 +519,7 @@ class AuthorizationService: else: # Extract eng_uuid from request body for arg in args: - if eng_uuid != None: + if eng_uuid: break if isinstance(arg, rest_framework.request.Request): try: @@ -512,7 +541,8 @@ class AuthorizationService: if 'eng_uuid' in data and data['eng_uuid']: eng_uuid = data['eng_uuid'] - elif 'engagement_uuid' in data and data['engagement_uuid']: + elif 'engagement_uuid' in data and \ + data['engagement_uuid']: eng_uuid = data['engagement_uuid'] except Exception as e: print(e) @@ -523,80 +553,88 @@ class AuthorizationService: # Extract CHECKLIST_UUID # if 'checklistUuid' in kwargs: request_data_mgr.set_cl_uuid(kwargs['checklistUuid']) - if (eng_uuid == None): + if not eng_uuid: try: eng_uuid = Checklist.objects.get( uuid=request_data_mgr.get_cl_uuid()).engagement.uuid request_data_mgr.set_eng_uuid(eng_uuid) except Checklist.DoesNotExist: - raise Exception("auth service couldn't fetch Checklist by checklist uuid=" + + raise Exception("auth service couldn't fetch Checklist " + + "by checklist uuid=" + request_data_mgr.get_cl_uuid()) except Exception as e: raise Exception( - "Failed fetching engagement uuid from checklist " + request_data_mgr.get_cl_uuid()) + "Failed fetching engagement uuid from checklist " + + request_data_mgr.get_cl_uuid()) # Extract engagement by NEXTSTEP_UUID # if 'ns_uuid' in kwargs: request_data_mgr.set_ns_uuid(kwargs['ns_uuid']) - if (eng_uuid == None): + if not eng_uuid: next_step = None try: next_step = NextStep.objects.get( uuid=request_data_mgr.get_ns_uuid()) except NextStep.DoesNotExist: - raise Exception("auth service couldn't fetch NextStep by nextstep uuid=" + + raise Exception("auth service couldn't" + + "fetch NextStep by nextstep uuid=" + request_data_mgr.get_ns_uuid()) try: eng_uuid = next_step.engagement.uuid request_data_mgr.set_eng_uuid(eng_uuid) - except: - # If we've gotten here it means that the next_step doesn't have attached + except BaseException: + # If we've gotten here it means that the next_step + # doesn't have attached # engagement (e.g personal next_step) pass # Extract engagement by VFC if ('uuid' in kwargs): from engagementmanager.rest.vfc import VFCRest - if (isinstance(args[0], VFCRest) == True): + if (isinstance(args[0], VFCRest)): try: vfc = VFC.objects.get(uuid=kwargs['uuid']) - if (eng_uuid == None): + if not eng_uuid: eng_uuid = vfc.vf.engagement.uuid request_data_mgr.set_eng_uuid(eng_uuid) except VFC.DoesNotExist: raise Exception( - "auth service couldn't fetch vfc by vfc uuid=" + kwargs['uuid']) + "auth service couldn't fetch vfc by vfc uuid=" + + kwargs['uuid']) # Extract engagement by VF (unfortunately the url exposed by the server # get uuid as a parameter and serve both vf and vfc APIs) # - if 'vf_uuid' in kwargs and eng_uuid == None: + if 'vf_uuid' in kwargs and not eng_uuid: try: eng_uuid = VF.objects.get( uuid=kwargs['vf_uuid']).engagement.uuid request_data_mgr.set_eng_uuid(eng_uuid) except VF.DoesNotExist: logger.error( - "Prepare_data_for_auth: Couldn't fetch engagement object from VF, trying to fetch from VFC...") + "Prepare_data_for_auth: Couldn't fetch engagement " + + "object from VF, trying to fetch from VFC...") vfc = None try: vfc = VFC.objects.get(uuid=kwargs['vf_uuid']) - if (vfc != None): + if vfc: eng_uuid = vfc.vf.engagement.uuid request_data_mgr.set_eng_uuid(eng_uuid) except VFC.DoesNotExist: logger.error( - "Prepare_data_for_auth: Couldn't fetch engagement object from VFC") + "Prepare_data_for_auth: Couldn't fetch engagement " + + "object from VFC") # Extract engagement by ChecklistDecision - if 'decision_uuid' in kwargs and eng_uuid == None: + if 'decision_uuid' in kwargs and not eng_uuid: try: eng_uuid = ChecklistDecision.objects.get( uuid=kwargs['decision_uuid']).checklist.engagement.uuid request_data_mgr.set_eng_uuid(eng_uuid) except ChecklistDecision.DoesNotExist: logger.error( - "Prepare_data_for_auth: Couldn't fetch engagement object from ChecklistDecision") + "Prepare_data_for_auth: Couldn't fetch " + + "engagement object from ChecklistDecision") # Extract notification uuid for permission check if 'notif_uuid' in kwargs: |