summaryrefslogtreecommitdiffstats
path: root/django/engagementmanager/service/authorization_service.py
diff options
context:
space:
mode:
Diffstat (limited to 'django/engagementmanager/service/authorization_service.py')
-rw-r--r--django/engagementmanager/service/authorization_service.py142
1 files changed, 90 insertions, 52 deletions
diff --git a/django/engagementmanager/service/authorization_service.py b/django/engagementmanager/service/authorization_service.py
index c850b4a..7a30d0e 100644
--- a/django/engagementmanager/service/authorization_service.py
+++ b/django/engagementmanager/service/authorization_service.py
@@ -1,5 +1,5 @@
-#
-# ============LICENSE_START==========================================
+#
+# ============LICENSE_START==========================================
# org.onap.vvp/engagementmgr
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -39,8 +39,8 @@
import json
from enum import Enum
import rest_framework
-from engagementmanager.models import Role, Engagement, Checklist, NextStep, VFC, \
- VF, ChecklistDecision, Notification
+from engagementmanager.models import Role, Engagement, Checklist, NextStep, \
+ VFC, VF, ChecklistDecision, Notification
from engagementmanager.utils.constants import Roles
from engagementmanager.utils.request_data_mgr import request_data_mgr
from engagementmanager.service.logging_service import LoggingServiceFactory
@@ -119,8 +119,10 @@ class Permissions(Enum):
class AuthorizationService:
"""
- The Authorization Service detemines whether a given action is authorized for a specific user.
- The method: is_user_able_to performs the authorization check given a user and an action (from Permissions ENUM)
+ The Authorization Service detemines whether a given action is authorized
+ for a specific user.
+ The method: is_user_able_to performs the authorization check given a user
+ and an action (from Permissions ENUM)
"""
role_standard_user = None
role_el = None
@@ -129,8 +131,10 @@ class AuthorizationService:
def __get_role_checks(self, user, action):
"""
- Returns the list of auth checks that should be performed per user action.
- Returns None if the action is not listed in the authorized actions of the given user.
+ Returns the list of auth checks that should be \
+ performed per user action.
+ Returns None if the action is not listed in the \
+ authorized actions of the given user.
"""
result = None
@@ -138,33 +142,40 @@ class AuthorizationService:
if (user.role == self.role_el) and (action in self.el_permissions):
result = self.el_permissions[action]
# ADMIN #
- elif user.role == self.role_admin and action in self.admin_permissions:
+ elif user.role == self.role_admin and action in \
+ self.admin_permissions:
result = self.admin_permissions[action]
# ADMIN Read only #
- elif user.role == self.role_admin_ro and action in self.admin_ro_permissions:
+ elif user.role == self.role_admin_ro and action in \
+ self.admin_ro_permissions:
result = self.admin_ro_permissions[action]
# STANDRARD_USER #
- if user.role == self.role_standard_user and action in self.standard_user_permissions:
+ if user.role == self.role_standard_user and \
+ action in self.standard_user_permissions:
result = self.standard_user_permissions[action]
return result
def __require_eng_membership(self, user, action, **kwargs):
"""
- Determines whether a given user is part of engagement team by the eng uuid
+ Determines whether a given user is part of engagement \
+ team by the eng uuid.
user = IceUser
eng = UUID as a string
:param user: user for auth check
:param action: action for auth check
:param kwargs: eng_uuid, checklist_uuid, ...
- :return: Boolean, Message -> True/False if auth check succeeds/fails and a message describing auth failure
+ :return: Boolean, Message -> True/False if auth check \
+ succeeds/fails and a message describing auth failure
"""
eng = kwargs['eng']
try:
# @UndefinedVariable
- if (user.email == eng.reviewer.email or user.email == eng.peer_reviewer.email or user.role.name == Roles.admin.name):
+ if (user.email == eng.reviewer.email or
+ user.email == eng.peer_reviewer.email or
+ user.role.name == Roles.admin.name):
return True, 'OK'
else:
# validate if user in Team
@@ -173,33 +184,39 @@ class AuthorizationService:
else:
return False, ""
except Engagement.DoesNotExist:
- msg = 'User ' + user.email + ' is not a member of engagement: ' + eng.uuid + \
- ' / User is a not peer reviewer / admin of the engagement / Engagement wasnt found while fetching from DB'
+ msg = 'User ' + user.email + ' is not a member of engagement: ' + \
+ eng.uuid + \
+ ' / User is a not peer reviewer / admin of the ' +\
+ 'engagement / Engagement wasnt found while fetching from DB'
logger.info(msg)
return False, msg
except Exception as e:
print(e)
- msg = 'A general error occurred while trying to validate that User ' + \
- user.email + ' is a member of engagement '
+ msg = 'A general error occurred while trying to validate ' +\
+ 'that User ' + user.email + ' is a member of engagement '
logger.info(msg + " Error: " + str(e))
return False, msg
def __require_peer_review_ownership(self, user, action, **kwargs):
"""
- Determines whether the given user is the peer reviewer of the checklist
+ Determines whether the given user is \
+ the peer reviewer of the checklist
"""
cl = kwargs['cl']
eng = kwargs['eng']
if cl and eng:
# @UndefinedVariable
- if (eng.peer_reviewer == user and cl.owner == user) or (user.role.name == Roles.admin.name):
+ if (eng.peer_reviewer == user and cl.owner == user) or \
+ (user.role.name == Roles.admin.name):
return True, 'OK'
else:
- return False, 'User is either not the owner of the checklist or not a peer reviewer of the checklist'
+ return False, 'User is either not the owner of ' +\
+ 'the checklist or not a peer reviewer of the checklist'
else:
logger.error(
- 'Internal Error - Checklist/Engagement not found while trying to check permissions for user ' + user.email)
+ 'Internal Error - Checklist/Engagement not found while ' +
+ 'trying to check permissions for user ' + user.email)
return False, 'Internal Error - Checklist not found'
def __require_cl_ownership(self, user, action, **kwargs):
@@ -216,7 +233,8 @@ class AuthorizationService:
return False, 'User is not the owner of the checklist'
else:
logger.error(
- 'Internal Error - Checklist not found while trying to check permissions for user ' + user.email)
+ 'Internal Error - Checklist not found while trying to ' +
+ 'check permissions for user ' + user.email)
return False, 'Internal Error - Checklist not found'
def __require_el_of_engagement(self, user, action, **kwargs):
@@ -234,7 +252,8 @@ class AuthorizationService:
return False, 'Role Not authorized'
else:
logger.error(
- 'Internal Error - Engagement not found while trying to check permissions for user ' + user.email)
+ 'Internal Error - Engagement not found while trying to ' +
+ 'check permissions for user ' + user.email)
return False, 'Internal Error - Checklist not found'
def __noop(self, user, action, **kwargs):
@@ -266,7 +285,8 @@ class AuthorizationService:
######################
"""
Each Permission Map is composed of the following key-val pairs:
- Key='Action (Permission ENUM)' --> Value='Set of Checks to perform on this action.'
+ Key='Action (Permission ENUM)' --> Value='Set of Checks to
+ perform on this action.'
"""
el_permissions = {
Permissions.add_vf: {__noop},
@@ -289,16 +309,20 @@ class AuthorizationService:
Permissions.set_checklist_decision: {__require_cl_ownership},
Permissions.add_checklist_audit_log: {__require_cl_ownership},
Permissions.delete_checklist_audit_log: {__require_cl_ownership},
- Permissions.el_review_checklist: {__require_cl_ownership, __require_eng_membership},
+ Permissions.el_review_checklist: {__require_cl_ownership,
+ __require_eng_membership},
Permissions.peer_review_checklist: {__require_peer_review_ownership},
- Permissions.handoff_checklist: {__require_cl_ownership, __require_eng_membership},
- Permissions.add_checklist_nextstep: {__require_cl_ownership, __require_eng_membership},
+ Permissions.handoff_checklist: {__require_cl_ownership,
+ __require_eng_membership},
+ Permissions.add_checklist_nextstep: {__require_cl_ownership,
+ __require_eng_membership},
Permissions.edit_nextstep: {__require_eng_membership},
Permissions.is_el_of_eng: {__require_el_of_engagement},
Permissions.update_personal_next_step: {__noop},
Permissions.create_checklist_audit_log: {__require_eng_membership},
Permissions.create_checklist_decision: {__require_eng_membership},
- Permissions.update_checklist_state: {__require_cl_ownership, __require_eng_membership},
+ Permissions.update_checklist_state: {__require_cl_ownership,
+ __require_eng_membership},
Permissions.create_deployment_target_site: {__require_eng_membership},
Permissions.star_an_engagement: {__noop},
Permissions.invite: {__require_eng_membership},
@@ -338,7 +362,8 @@ class AuthorizationService:
Permissions.update_vf: {__require_eng_membership},
Permissions.reset_nextstep: {__require_eng_membership},
Permissions.update_personal_next_step: {__noop},
- Permissions.update_checklist_state: {__require_cl_ownership, __require_eng_membership},
+ Permissions.update_checklist_state: {__require_cl_ownership,
+ __require_eng_membership},
Permissions.create_deployment_target_site: {__require_eng_membership},
Permissions.star_an_engagement: {__noop},
Permissions.invite: {__require_eng_membership},
@@ -368,7 +393,8 @@ class AuthorizationService:
admin_permissions.update( # Add Extra permissions to admin
{
Permissions.admin_approve_checklist: {__require_cl_ownership},
- Permissions.remove_from_engagement_team: {__require_eng_membership},
+ Permissions.remove_from_engagement_team: {
+ __require_eng_membership},
Permissions.view_checklist_template: {__noop},
Permissions.edit_checklist_template: {__noop},
Permissions.archive_engagement: {__noop},
@@ -403,7 +429,8 @@ class AuthorizationService:
)
def __init__(self):
- self.role_standard_user = self.role_el = self.role_admin = self.role_admin_ro = None
+ self.role_standard_user = self.role_el = self.role_admin = \
+ self.role_admin_ro = None
self.__load_roles_from_db()
def check_permissions(self, user, action, eng_uuid, role, eng, cl):
@@ -411,7 +438,8 @@ class AuthorizationService:
# role and action
perm_checks = self.__get_role_checks(user, action)
if not perm_checks:
- # Permission Checks were not found, it means that the action is not listed in the permitted
+ # Permission Checks were not found, it means that the action is
+ # not listed in the permitted
# actions for the role of the user
ret = False, 'Role ' + str(role.name) + ' is not permitted to ' + \
str(action.name) + '/ Engagement: ' + \
@@ -456,7 +484,8 @@ class AuthorizationService:
eng = Engagement.objects.get(uuid=eng_uuid)
except Engagement.DoesNotExist:
logger.error(
- 'ENG was not found while checking permissions... returning 500')
+ 'ENG was not found while checking permissions... ' +
+ 'returning 500')
return None, None
try:
@@ -490,7 +519,7 @@ class AuthorizationService:
else:
# Extract eng_uuid from request body
for arg in args:
- if eng_uuid != None:
+ if eng_uuid:
break
if isinstance(arg, rest_framework.request.Request):
try:
@@ -512,7 +541,8 @@ class AuthorizationService:
if 'eng_uuid' in data and data['eng_uuid']:
eng_uuid = data['eng_uuid']
- elif 'engagement_uuid' in data and data['engagement_uuid']:
+ elif 'engagement_uuid' in data and \
+ data['engagement_uuid']:
eng_uuid = data['engagement_uuid']
except Exception as e:
print(e)
@@ -523,80 +553,88 @@ class AuthorizationService:
# Extract CHECKLIST_UUID #
if 'checklistUuid' in kwargs:
request_data_mgr.set_cl_uuid(kwargs['checklistUuid'])
- if (eng_uuid == None):
+ if not eng_uuid:
try:
eng_uuid = Checklist.objects.get(
uuid=request_data_mgr.get_cl_uuid()).engagement.uuid
request_data_mgr.set_eng_uuid(eng_uuid)
except Checklist.DoesNotExist:
- raise Exception("auth service couldn't fetch Checklist by checklist uuid=" +
+ raise Exception("auth service couldn't fetch Checklist " +
+ "by checklist uuid=" +
request_data_mgr.get_cl_uuid())
except Exception as e:
raise Exception(
- "Failed fetching engagement uuid from checklist " + request_data_mgr.get_cl_uuid())
+ "Failed fetching engagement uuid from checklist "
+ + request_data_mgr.get_cl_uuid())
# Extract engagement by NEXTSTEP_UUID #
if 'ns_uuid' in kwargs:
request_data_mgr.set_ns_uuid(kwargs['ns_uuid'])
- if (eng_uuid == None):
+ if not eng_uuid:
next_step = None
try:
next_step = NextStep.objects.get(
uuid=request_data_mgr.get_ns_uuid())
except NextStep.DoesNotExist:
- raise Exception("auth service couldn't fetch NextStep by nextstep uuid=" +
+ raise Exception("auth service couldn't" +
+ "fetch NextStep by nextstep uuid=" +
request_data_mgr.get_ns_uuid())
try:
eng_uuid = next_step.engagement.uuid
request_data_mgr.set_eng_uuid(eng_uuid)
- except:
- # If we've gotten here it means that the next_step doesn't have attached
+ except BaseException:
+ # If we've gotten here it means that the next_step
+ # doesn't have attached
# engagement (e.g personal next_step)
pass
# Extract engagement by VFC
if ('uuid' in kwargs):
from engagementmanager.rest.vfc import VFCRest
- if (isinstance(args[0], VFCRest) == True):
+ if (isinstance(args[0], VFCRest)):
try:
vfc = VFC.objects.get(uuid=kwargs['uuid'])
- if (eng_uuid == None):
+ if not eng_uuid:
eng_uuid = vfc.vf.engagement.uuid
request_data_mgr.set_eng_uuid(eng_uuid)
except VFC.DoesNotExist:
raise Exception(
- "auth service couldn't fetch vfc by vfc uuid=" + kwargs['uuid'])
+ "auth service couldn't fetch vfc by vfc uuid="
+ + kwargs['uuid'])
# Extract engagement by VF (unfortunately the url exposed by the server
# get uuid as a parameter and serve both vf and vfc APIs) #
- if 'vf_uuid' in kwargs and eng_uuid == None:
+ if 'vf_uuid' in kwargs and not eng_uuid:
try:
eng_uuid = VF.objects.get(
uuid=kwargs['vf_uuid']).engagement.uuid
request_data_mgr.set_eng_uuid(eng_uuid)
except VF.DoesNotExist:
logger.error(
- "Prepare_data_for_auth: Couldn't fetch engagement object from VF, trying to fetch from VFC...")
+ "Prepare_data_for_auth: Couldn't fetch engagement " +
+ "object from VF, trying to fetch from VFC...")
vfc = None
try:
vfc = VFC.objects.get(uuid=kwargs['vf_uuid'])
- if (vfc != None):
+ if vfc:
eng_uuid = vfc.vf.engagement.uuid
request_data_mgr.set_eng_uuid(eng_uuid)
except VFC.DoesNotExist:
logger.error(
- "Prepare_data_for_auth: Couldn't fetch engagement object from VFC")
+ "Prepare_data_for_auth: Couldn't fetch engagement " +
+ "object from VFC")
# Extract engagement by ChecklistDecision
- if 'decision_uuid' in kwargs and eng_uuid == None:
+ if 'decision_uuid' in kwargs and not eng_uuid:
try:
eng_uuid = ChecklistDecision.objects.get(
uuid=kwargs['decision_uuid']).checklist.engagement.uuid
request_data_mgr.set_eng_uuid(eng_uuid)
except ChecklistDecision.DoesNotExist:
logger.error(
- "Prepare_data_for_auth: Couldn't fetch engagement object from ChecklistDecision")
+ "Prepare_data_for_auth: Couldn't fetch " +
+ "engagement object from ChecklistDecision")
# Extract notification uuid for permission check
if 'notif_uuid' in kwargs: