aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
blob: b714ec6b3e251f074217a7b6eee282051244db5c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
// -
//   ========================LICENSE_START=================================
//   Copyright (C) 2025: Deutsche Telekom
//
//   Licensed under the Apache License, Version 2.0 (the "License");
//   you may not use this file except in compliance with the License.
//   You may obtain a copy of the License at
//
//        http://www.apache.org/licenses/LICENSE-2.0
//
//   Unless required by applicable law or agreed to in writing, software
//   distributed under the License is distributed on an "AS IS" BASIS,
//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//   See the License for the specific language governing permissions and
//   limitations under the License.
//   SPDX-License-Identifier: Apache-2.0
//   ========================LICENSE_END===================================

// will process the update message from pap and send the pdp status response.
package handler

import (
	"context"
	"fmt"
	"path/filepath"
	"policy-opa-pdp/consts"
	"policy-opa-pdp/pkg/kafkacomm/publisher"
	"policy-opa-pdp/pkg/log"
	"policy-opa-pdp/pkg/metrics"
	"policy-opa-pdp/pkg/model"
	"policy-opa-pdp/pkg/opasdk"
	"policy-opa-pdp/pkg/policymap"
	"policy-opa-pdp/pkg/utils"
	"strings"
)

type (
	HandlePolicyUndeploymentFunc func(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string)
)

var (
	handlePolicyUndeploymentVar HandlePolicyUndeploymentFunc = handlePolicyUndeployment

	removeDirectoryFunc = utils.RemoveDirectory

	deleteDataSdkFunc = opasdk.DeleteData

	deletePolicySdkFunc = opasdk.DeletePolicy

	removeDataDirectoryFunc = removeDataDirectory

	removePolicyDirectoryFunc = removePolicyDirectory

	policyUndeploymentActionFunc = policyUndeploymentAction

	removePolicyFromSdkandDirFunc = removePolicyFromSdkandDir

	removeDataFromSdkandDirFunc = removeDataFromSdkandDir
)

// processPoliciesTobeUndeployed handles the undeployment of policies
func processPoliciesTobeUndeployed(undeployedPolicies map[string]string) ([]string, map[string]string) {
	var failureMessages []string

	successfullyUndeployedPolicies := make(map[string]string)

	// Unmarshal the last known policies
	deployedPolicies, err := policymap.UnmarshalLastDeployedPolicies(policymap.LastDeployedPolicies)
	if err != nil {
		log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
	}

	for policyID, policyVersion := range undeployedPolicies {
		// Check if undeployed policy exists in deployedPolicies
		matchedPolicy := findDeployedPolicy(policyID, policyVersion, deployedPolicies)
		if matchedPolicy != nil {
			// Handle undeployment for the policy
			errs := policyUndeploymentActionFunc(matchedPolicy)
			if len(errs) > 0 {
				metrics.IncrementUndeployFailureCount()
				metrics.IncrementTotalErrorCount()
				failureMessages = append(failureMessages, errs...)
			}
			deployedPoliciesMap, err := policymap.RemoveUndeployedPoliciesfromMap(matchedPolicy)
			if err != nil {
				log.Warnf("Policy Name: %s, Version: %s is not removed from LastDeployedPolicies", policyID, policyVersion)
				failureMessages = append(failureMessages, "Error in removing from LastDeployedPolicies")
			}
			log.Debugf("Policies Map After Undeployment : %s", deployedPoliciesMap)
			metrics.IncrementUndeploySuccessCount()
			successfullyUndeployedPolicies[policyID] = policyVersion
		} else {
			// Log failure if no match is found
			log.Debugf("Policy Name: %s, Version: %s is marked for undeployment but was not deployed", policyID, policyVersion)
			continue
		}
	}

	totalPolicies := policymap.GetTotalDeployedPoliciesCountFromMap()
	metrics.SetTotalPoliciesCount(int64(totalPolicies))

	return failureMessages, successfullyUndeployedPolicies
}

func handlePolicyUndeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string) {

	// Extract undeployed policies into a dictionary
	undeployedPoliciesDict := extractUndeployedPolicies(pdpUpdate.PoliciesToBeUndeployed)

	// Process undeployment actions
	errorMessages, successfullyUndeployedPolicies := processPoliciesTobeUndeployed(undeployedPoliciesDict)

	return errorMessages, successfullyUndeployedPolicies

}

// ExtractUndeployedPolicies extracts policy names and versions into a map
func extractUndeployedPolicies(policies []model.ToscaConceptIdentifier) map[string]string {
	undeployedPoliciesDict := make(map[string]string)
	for _, policy := range policies {
		undeployedPoliciesDict[policy.Name] = policy.Version
		log.Infof("Extracted Policy Name: %s, Version: %s for undeployment", policy.Name, policy.Version)
	}
	return undeployedPoliciesDict
}

// HandlePolicyUndeployment processes the actual undeployment actions for a policy
func policyUndeploymentAction(policy map[string]interface{}) []string {
	var failureMessages []string

	// Delete "policy" sdk and directories
	policyErrors := removePolicyFromSdkandDirFunc(policy)
	failureMessages = append(failureMessages, policyErrors...)

	// Delete "data" sdk and directories
	dataErrors := removeDataFromSdkandDirFunc(policy)
	failureMessages = append(failureMessages, dataErrors...)

	return failureMessages
}

// removeDataFromSdkandDir handles the "data" directories in the policy
func removeDataFromSdkandDir(policy map[string]interface{}) []string {
	var failureMessages []string

	if dataKeys, ok := policy["data"].([]interface{}); ok {
		for _, dataKey := range dataKeys {
			keyPath := dataKey.(string)
			keyPath = "/" + strings.Replace(keyPath, ".", "/", -1)
			log.Debugf("Deleting data from OPA : %s", keyPath)
			if err := deleteDataSdkFunc(context.Background(), keyPath); err != nil {
				failureMessages = append(failureMessages, err.Error())
				continue
			}
			if err := removeDataDirectoryFunc(keyPath); err != nil {
				failureMessages = append(failureMessages, err.Error())
			}
		}
	} else {
		failureMessages = append(failureMessages, fmt.Sprintf("%s:%s Invalid JSON structure: 'data' is missing or not an array", policy["policy-id"], policy["policy-version"]))
	}

	return failureMessages
}

// removePolicyFromSdkandDir handles the "policy" directories in the policy
func removePolicyFromSdkandDir(policy map[string]interface{}) []string {
	var failureMessages []string

	if policyKeys, ok := policy["policy"].([]interface{}); ok {
		for _, policyKey := range policyKeys {
			keyPath := "/" + strings.Replace(policyKey.(string), ".", "/", -1)
			log.Debugf("Deleting Policy from OPA : %s", keyPath)
			if err := deletePolicySdkFunc(context.Background(), policyKey.(string)); err != nil {
				failureMessages = append(failureMessages, err.Error())
				continue
			}
			if err := removePolicyDirectoryFunc(keyPath); err != nil {
				failureMessages = append(failureMessages, err.Error())
			}
		}
	} else {
		failureMessages = append(failureMessages, fmt.Sprintf("%s:%s Invalid JSON structure: 'policy' is missing or not an array", policy["policy-id"], policy["policy-version"]))
	}

	return failureMessages
}

// RemoveDataDirectory removes a directory for data
func removeDataDirectory(dataKey string) error {
	dataPath := filepath.Join(consts.Data, dataKey)
	log.Debugf("Removing data directory: %s", dataPath)
	if err := removeDirectoryFunc(dataPath); err != nil {
		return fmt.Errorf("Failed to handle directory for data %s: %v", dataPath, err)
	}
	return nil
}

// RemovePolicyDirectory removes a directory for policies
func removePolicyDirectory(policyKey string) error {
	policyPath := filepath.Join(consts.Policies, policyKey)
	log.Debugf("Removing policy directory: %s", policyPath)
	if err := removeDirectoryFunc(policyPath); err != nil {
		return fmt.Errorf("Failed to handle directory for policy %s: %v", policyPath, err)
	}
	return nil
}

// findDeployedPolicy searches for a policy in deployedPolicies
func findDeployedPolicy(policyID, policyVersion string, deployedPolicies []map[string]interface{}) map[string]interface{} {
	for _, policy := range deployedPolicies {
		// Extract policy-id and policy-version from the deployed policy
		id, idOk := policy["policy-id"].(string)
		version, versionOk := policy["policy-version"].(string)

		// Check if the deployed policy matches the undeployed policy
		if idOk && versionOk && id == policyID && version == policyVersion {
			return policy
		}
	}
	return nil
}