1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
|
// -
// ========================LICENSE_START=================================
// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
// will process the update message from pap and send the pdp status response.
package handler
import (
"encoding/json"
"fmt"
"os/exec"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/kafkacomm/publisher"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/pdpattributes"
"policy-opa-pdp/pkg/policymap"
"policy-opa-pdp/pkg/utils"
"strings"
)
type (
sendSuccessResponseFunc func(p publisher.PdpStatusSender, pdpUpdate *model.PdpUpdate, respMessage string) error
sendFailureResponseFunc func(p publisher.PdpStatusSender, pdpUpdate *model.PdpUpdate, respMessage error) error
createBundleFuncRef func(execCmd func(string, ...string) *exec.Cmd, toscaPolicy model.ToscaPolicy) (string, error)
)
var (
basePolicyDir = consts.Policies
baseDataDir = consts.Data
sendSuccessResponseVar sendSuccessResponseFunc = sendSuccessResponse
sendFailureResponseVar sendFailureResponseFunc = sendFailureResponse
createBundleFuncVar createBundleFuncRef = createBundleFunc
)
// Handles messages of type PDP_UPDATE sent from the Policy Administration Point (PAP).
// It validates the incoming data, updates PDP attributes, and sends a response back to the sender.
func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error {
var failureMessages []string
var pdpUpdate model.PdpUpdate
var loggingPoliciesList string
err := json.Unmarshal(message, &pdpUpdate)
if err != nil {
log.Debugf("Failed to UnMarshal Messages: %v\n", err)
resMessage := fmt.Errorf("PDP Update Failed: %v", err)
if err := sendFailureResponseVar(p, &pdpUpdate, resMessage); err != nil {
log.Debugf("Failed to send update error response: %v", err)
return err
}
return err
}
//Initialize Validator and validate Struct after unmarshalling
err = utils.ValidateFieldsStructs(pdpUpdate)
if err != nil {
resMessage := fmt.Errorf("PDP Update Failed: %v", err)
if err := sendFailureResponseVar(p, &pdpUpdate, resMessage); err != nil {
log.Debugf("Failed to send update error response: %v", err)
return err
}
return err
}
log.Debugf("PDP_UPDATE Message received: %s", string(message))
pdpattributes.SetPdpSubgroup(pdpUpdate.PdpSubgroup)
if len(pdpUpdate.PoliciesToBeDeployed) > 0 {
failureMessage, successfullyDeployedPolicies := handlePolicyDeploymentVar(pdpUpdate, p)
mapJson, err := policymap.FormatMapofAnyType(successfullyDeployedPolicies)
if len(failureMessage) > 0 {
failureMessages = append(failureMessages, "{Deployment Errors:"+strings.Join(failureMessage, "")+"}")
}
if err != nil {
failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|")
resMessage := fmt.Errorf("PDP Update Failed: failed to format successfullyDeployedPolicies json %v", failureMessages)
if err = sendFailureResponseVar(p, &pdpUpdate, resMessage); err != nil {
log.Debugf("Failed to send update error response: %v", err)
return err
}
}
loggingPoliciesList = mapJson
}
// Check if "PoliciesToBeUndeployed" is empty or not
if len(pdpUpdate.PoliciesToBeUndeployed) > 0 {
log.Infof("Found Policies to be undeployed")
failureMessage, successfullyUndeployedPolicies := handlePolicyUndeploymentVar(pdpUpdate, p)
mapJson, err := policymap.FormatMapofAnyType(successfullyUndeployedPolicies)
if len(failureMessage) > 0 {
failureMessages = append(failureMessages, "{UnDeployment Errors:"+strings.Join(failureMessage, "")+"}")
}
if err != nil {
failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|")
resMessage := fmt.Errorf("PDP Update Failed: failed to format successfullyUnDeployedPolicies json %v", failureMessages)
if err = sendFailureResponseVar(p, &pdpUpdate, resMessage); err != nil {
log.Debugf("Failed to send update error response: %v", err)
return err
}
}
loggingPoliciesList = mapJson
}
if len(pdpUpdate.PoliciesToBeDeployed) == 0 && len(pdpUpdate.PoliciesToBeUndeployed) == 0 {
//Response for PAP Registration
err = sendSuccessResponseVar(p, &pdpUpdate, "PDP UPDATE is successfull")
if err != nil {
log.Debugf("Failed to Send Update Response Message: %v\n", err)
return err
}
} else {
//Send Response for Deployment or Undeployment or when both deployment and undeployment comes together
if err := sendPDPStatusResponse(pdpUpdate, p, loggingPoliciesList, failureMessages); err != nil {
return err
}
}
log.Infof("PDP_STATUS Message Sent Successfully")
log.Debug(pdpUpdate.PdpHeartbeatIntervalMs)
if pdpattributes.PdpHeartbeatInterval != pdpUpdate.PdpHeartbeatIntervalMs && pdpUpdate.PdpHeartbeatIntervalMs != 0 {
//restart the ticker.
publisher.StopTicker()
pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs)
go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
}
return nil
}
// build bundle tar file
func createBundleFunc(execCmd func(string, ...string) *exec.Cmd, toscaPolicy model.ToscaPolicy) (string, error) {
return utils.BuildBundle(execCmd)
}
func sendSuccessResponse(p publisher.PdpStatusSender, pdpUpdate *model.PdpUpdate, respMessage string) error {
if err := publisher.SendPdpUpdateResponse(p, pdpUpdate, respMessage); err != nil {
return err
}
return nil
}
func sendFailureResponse(p publisher.PdpStatusSender, pdpUpdate *model.PdpUpdate, respMessage error) error {
if err := publisher.SendPdpUpdateErrorResponse(p, pdpUpdate, respMessage); err != nil {
return err
}
return nil
}
func sendPDPStatusResponse(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender, loggingPoliciesList string, failureMessages []string) error {
if len(failureMessages) > 0 {
resMessage := fmt.Errorf("PDP Update Failed: %v", failureMessages)
if err := sendFailureResponseVar(p, &pdpUpdate, resMessage); err != nil {
log.Warnf("Failed to send update error response: %v", err)
return err
}
} else {
if len(pdpUpdate.PoliciesToBeUndeployed) == 0 {
resMessage := fmt.Sprintf("PDP Update Successful for all policies: %v", loggingPoliciesList)
if err := sendSuccessResponseVar(p, &pdpUpdate, resMessage); err != nil {
log.Warnf("Failed to send update response: %v", err)
return err
}
log.Infof("Processed policies_to_be_deployed successfully")
} else if len(pdpUpdate.PoliciesToBeDeployed) == 0 {
resMessage := fmt.Sprintf("PDP Update Policies undeployed :%v", loggingPoliciesList)
if err := sendSuccessResponseVar(p, &pdpUpdate, resMessage); err != nil {
log.Warnf("Failed to Send Update Response Message: %v", err)
return err
}
log.Infof("Processed policies_to_be_undeployed successfully")
} else {
if err := sendSuccessResponseVar(p, &pdpUpdate, "PDP UPDATE is successfull"); err != nil {
log.Warnf("Failed to Send Update Response Message: %v", err)
return err
}
}
}
return nil
}
|