1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
|
// -
// ========================LICENSE_START=================================
// Copyright (C) 2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
// will process the update message from pap and send the pdp status response.
package handler
import (
"context"
"fmt"
"path/filepath"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/kafkacomm/publisher"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/metrics"
"policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/opasdk"
"policy-opa-pdp/pkg/policymap"
"policy-opa-pdp/pkg/utils"
"strings"
)
type (
HandlePolicyUndeploymentFunc func(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string)
)
var (
handlePolicyUndeploymentVar HandlePolicyUndeploymentFunc = handlePolicyUndeployment
removeDirectoryFunc = utils.RemoveDirectory
deleteDataSdkFunc = opasdk.DeleteData
deletePolicySdkFunc = opasdk.DeletePolicy
removeDataDirectoryFunc = removeDataDirectory
removePolicyDirectoryFunc = removePolicyDirectory
policyUndeploymentActionFunc = policyUndeploymentAction
removePolicyFromSdkandDirFunc = removePolicyFromSdkandDir
removeDataFromSdkandDirFunc = removeDataFromSdkandDir
)
// processPoliciesTobeUndeployed handles the undeployment of policies
func processPoliciesTobeUndeployed(undeployedPolicies map[string]string) ([]string, map[string]string) {
var failureMessages []string
successfullyUndeployedPolicies := make(map[string]string)
// Unmarshal the last known policies
deployedPolicies, err := policymap.UnmarshalLastDeployedPolicies(policymap.LastDeployedPolicies)
if err != nil {
log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
}
for policyID, policyVersion := range undeployedPolicies {
// Check if undeployed policy exists in deployedPolicies
matchedPolicy := findDeployedPolicy(policyID, policyVersion, deployedPolicies)
if matchedPolicy != nil {
// Handle undeployment for the policy
errs := policyUndeploymentActionFunc(matchedPolicy)
if len(errs) > 0 {
metrics.IncrementUndeployFailureCount()
metrics.IncrementTotalErrorCount()
failureMessages = append(failureMessages, errs...)
}
deployedPoliciesMap, err := policymap.RemoveUndeployedPoliciesfromMap(matchedPolicy)
if err != nil {
log.Warnf("Policy Name: %s, Version: %s is not removed from LastDeployedPolicies", policyID, policyVersion)
failureMessages = append(failureMessages, "Error in removing from LastDeployedPolicies")
}
log.Debugf("Policies Map After Undeployment : %s", deployedPoliciesMap)
metrics.IncrementUndeploySuccessCount()
successfullyUndeployedPolicies[policyID] = policyVersion
} else {
// Log failure if no match is found
log.Debugf("Policy Name: %s, Version: %s is marked for undeployment but was not deployed", policyID, policyVersion)
continue
}
}
totalPolicies := policymap.GetTotalDeployedPoliciesCountFromMap()
metrics.SetTotalPoliciesCount(int64(totalPolicies))
return failureMessages, successfullyUndeployedPolicies
}
func handlePolicyUndeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string) {
// Extract undeployed policies into a dictionary
undeployedPoliciesDict := extractUndeployedPolicies(pdpUpdate.PoliciesToBeUndeployed)
// Process undeployment actions
errorMessages, successfullyUndeployedPolicies := processPoliciesTobeUndeployed(undeployedPoliciesDict)
return errorMessages, successfullyUndeployedPolicies
}
// ExtractUndeployedPolicies extracts policy names and versions into a map
func extractUndeployedPolicies(policies []model.ToscaConceptIdentifier) map[string]string {
undeployedPoliciesDict := make(map[string]string)
for _, policy := range policies {
undeployedPoliciesDict[policy.Name] = policy.Version
log.Infof("Extracted Policy Name: %s, Version: %s for undeployment", policy.Name, policy.Version)
}
return undeployedPoliciesDict
}
// HandlePolicyUndeployment processes the actual undeployment actions for a policy
func policyUndeploymentAction(policy map[string]interface{}) []string {
var failureMessages []string
// Delete "policy" sdk and directories
policyErrors := removePolicyFromSdkandDirFunc(policy)
failureMessages = append(failureMessages, policyErrors...)
// Delete "data" sdk and directories
dataErrors := removeDataFromSdkandDirFunc(policy)
failureMessages = append(failureMessages, dataErrors...)
return failureMessages
}
// removeDataFromSdkandDir handles the "data" directories in the policy
func removeDataFromSdkandDir(policy map[string]interface{}) []string {
var failureMessages []string
if dataKeys, ok := policy["data"].([]interface{}); ok {
for _, dataKey := range dataKeys {
keyPath := dataKey.(string)
keyPath = "/" + strings.Replace(keyPath, ".", "/", -1)
log.Debugf("Deleting data from OPA : %s", keyPath)
if err := deleteDataSdkFunc(context.Background(), keyPath); err != nil {
failureMessages = append(failureMessages, err.Error())
continue
}
if err := removeDataDirectoryFunc(keyPath); err != nil {
failureMessages = append(failureMessages, err.Error())
}
}
} else {
failureMessages = append(failureMessages, fmt.Sprintf("%s:%s Invalid JSON structure: 'data' is missing or not an array", policy["policy-id"], policy["policy-version"]))
}
return failureMessages
}
// removePolicyFromSdkandDir handles the "policy" directories in the policy
func removePolicyFromSdkandDir(policy map[string]interface{}) []string {
var failureMessages []string
if policyKeys, ok := policy["policy"].([]interface{}); ok {
for _, policyKey := range policyKeys {
keyPath := "/" + strings.Replace(policyKey.(string), ".", "/", -1)
log.Debugf("Deleting Policy from OPA : %s", keyPath)
if err := deletePolicySdkFunc(context.Background(), policyKey.(string)); err != nil {
failureMessages = append(failureMessages, err.Error())
continue
}
if err := removePolicyDirectoryFunc(keyPath); err != nil {
failureMessages = append(failureMessages, err.Error())
}
}
} else {
failureMessages = append(failureMessages, fmt.Sprintf("%s:%s Invalid JSON structure: 'policy' is missing or not an array", policy["policy-id"], policy["policy-version"]))
}
return failureMessages
}
// RemoveDataDirectory removes a directory for data
func removeDataDirectory(dataKey string) error {
dataPath := filepath.Join(consts.Data, dataKey)
log.Debugf("Removing data directory: %s", dataPath)
if err := removeDirectoryFunc(dataPath); err != nil {
return fmt.Errorf("Failed to handle directory for data %s: %v", dataPath, err)
}
return nil
}
// RemovePolicyDirectory removes a directory for policies
func removePolicyDirectory(policyKey string) error {
policyPath := filepath.Join(consts.Policies, policyKey)
log.Debugf("Removing policy directory: %s", policyPath)
if err := removeDirectoryFunc(policyPath); err != nil {
return fmt.Errorf("Failed to handle directory for policy %s: %v", policyPath, err)
}
return nil
}
// findDeployedPolicy searches for a policy in deployedPolicies
func findDeployedPolicy(policyID, policyVersion string, deployedPolicies []map[string]interface{}) map[string]interface{} {
for _, policy := range deployedPolicies {
// Extract policy-id and policy-version from the deployed policy
id, idOk := policy["policy-id"].(string)
version, versionOk := policy["policy-version"].(string)
// Check if the deployed policy matches the undeployed policy
if idOk && versionOk && id == policyID && version == policyVersion {
return policy
}
}
return nil
}
|