aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/kafkacomm/handler
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/kafkacomm/handler')
-rw-r--r--pkg/kafkacomm/handler/pdp_update_deploy_policy.go14
-rw-r--r--pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go126
-rw-r--r--pkg/kafkacomm/handler/pdp_update_message_handler.go11
-rw-r--r--pkg/kafkacomm/handler/pdp_update_message_handler_test.go2
-rw-r--r--pkg/kafkacomm/handler/pdp_update_undeploy_policy.go195
-rw-r--r--pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go81
6 files changed, 246 insertions, 183 deletions
diff --git a/pkg/kafkacomm/handler/pdp_update_deploy_policy.go b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go
index bf56951..5d6cd93 100644
--- a/pkg/kafkacomm/handler/pdp_update_deploy_policy.go
+++ b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go
@@ -36,6 +36,7 @@ import (
"policy-opa-pdp/pkg/opasdk"
"policy-opa-pdp/pkg/policymap"
"policy-opa-pdp/pkg/utils"
+ "sort"
"strings"
)
@@ -168,7 +169,7 @@ func extractAndDecodePolicies(policy model.ToscaPolicy) (map[string]string, []st
return nil, nil, err
}
- log.Debugf("Decoded policy content for key '%s': %s", key, decodedPolicy)
+ log.Tracef("Decoded policy content for key '%s': %s", key, decodedPolicy)
}
return decodedPolicies, keys, nil
@@ -228,7 +229,7 @@ func getDirName(policy model.ToscaPolicy) []string {
for key, _ := range policy.Properties.Data {
- dirNames = append(dirNames, strings.ReplaceAll(consts.Data+"/"+key, ".", "/"))
+ dirNames = append(dirNames, strings.ReplaceAll(consts.DataNode+key, ".", "/"))
}
for key, _ := range policy.Properties.Policy {
@@ -258,13 +259,14 @@ func upsertPolicy(policy model.ToscaPolicy) error {
// handles writing data to sdk.
func upsertData(policy model.ToscaPolicy) error {
decodedDataContent, dataKeys, _ := extractAndDecodeDataVar(policy)
+ sort.Sort(utils.ByDotCount{Keys: dataKeys, Ascend: true})
for _, dataKey := range dataKeys {
dataContent := decodedDataContent[dataKey]
reader := bytes.NewReader([]byte(dataContent))
decoder := json.NewDecoder(reader)
decoder.UseNumber()
- var wdata map[string]interface{}
+ var wdata interface{}
err := decoder.Decode(&wdata)
if err != nil {
log.Errorf("Failed to Insert Data: %s: %v", policy.Name, err)
@@ -365,11 +367,7 @@ func checkIfPolicyAlreadyDeployed(pdpUpdate model.PdpUpdate) []model.ToscaPolicy
// verfies policy by creating bundle.
func verifyPolicyByBundleCreation(policy model.ToscaPolicy) error {
// get directory name
- dirNames := getDirName(policy)
- if len(dirNames) == 0 {
- log.Warnf("Unable to extract folder name from policy %s", policy.Name)
- return fmt.Errorf("failed to extract folder name")
- }
+ dirNames := []string{strings.ReplaceAll(consts.DataNode+"/"+policy.Name, ".", "/"), strings.ReplaceAll(consts.Policies+"/"+policy.Name, ".", "/")}
// create bundle
output, err := createBundleFuncVar(exec.Command, policy)
if err != nil {
diff --git a/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go b/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go
index 3e4a24a..e95bbeb 100644
--- a/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go
+++ b/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go
@@ -78,128 +78,6 @@ func TestValidatePackageName(t *testing.T) {
}
}
-func TestGetDirName(t *testing.T) {
- var testData = []struct {
- name string
- policy model.ToscaPolicy // Use the actual package name
- expected []string
- }{
- {
- name: "Basic valid case",
- policy: model.ToscaPolicy{
- Type: "onap.policies.native.opa",
- TypeVersion: "1.0.0",
- Properties: model.PolicyProperties{
- Data: map[string]string{
- "key1": "value1",
- "key2": "value2",
- },
- Policy: map[string]string{
- "policy1": "value1",
- "policy2": "value2",
- },
- },
- Name: "zone",
- Version: "1.0.0",
- Metadata: model.Metadata{
- PolicyID: "zone",
- PolicyVersion: "1.0.0",
- },
- },
- expected: []string{
- "/opt/data/key2",
- "/opt/data/key1",
- "/opt/policies/policy1",
- "/opt/policies/policy2",
- },
- },
- {
- name: "Empty policy",
- policy: model.ToscaPolicy{
- Type: "onap.policies.native.opa",
- TypeVersion: "1.0.0",
- Properties: model.PolicyProperties{
- Data: map[string]string{},
- Policy: map[string]string{},
- },
- Name: "zone",
- Version: "1.0.0",
- Metadata: model.Metadata{
- PolicyID: "zone",
- PolicyVersion: "1.0.0",
- },
- },
- expected: []string{}, // No directories expected
- },
- {
- name: "Multiple keys",
- policy: model.ToscaPolicy{
- Type: "onap.policies.native.opa",
- TypeVersion: "1.0.0",
- Properties: model.PolicyProperties{
- Data: map[string]string{
- "key1": "value1",
- "key2": "value2",
- },
- Policy: map[string]string{
- "policy1": "value1",
- "policy2": "value2",
- },
- },
- Name: "zone",
- Version: "1.0.0",
- Metadata: model.Metadata{
- PolicyID: "zone",
- PolicyVersion: "1.0.0",
- },
- },
- expected: []string{
- "/opt/data/key1",
- "/opt/data/key2",
- "/opt/policies/policy1",
- "/opt/policies/policy2",
- },
- },
- {
- name: "Special characters",
- policy: model.ToscaPolicy{
- Type: "onap.policies.native.opa",
- TypeVersion: "1.0.0",
- Properties: model.PolicyProperties{
- Data: map[string]string{
- "key.with.dot": "value1",
- },
- Policy: map[string]string{
- "policy.with.dot": "value2",
- },
- },
- Name: "zone",
- Version: "1.0.0",
- Metadata: model.Metadata{
- PolicyID: "zone",
- PolicyVersion: "1.0.0",
- },
- },
- expected: []string{
- "/opt/data/key/with/dot",
- "/opt/policies/policy/with/dot",
- },
- },
- }
- for _, tt := range testData {
- t.Run(tt.name, func(t *testing.T) {
- result := getDirName(tt.policy)
- // Check that the actual result is either nil or empty
- if len(tt.expected) == 0 {
- // They should both be empty
- assert.Empty(t, result) // Assert that result is empty
- } else {
- assert.ElementsMatch(t, tt.expected, result) // Standard equality check for non-empty scenarios
- }
- })
- }
-}
-
func TestExtractAndDecodeData(t *testing.T) {
tests := []struct {
name string
@@ -508,7 +386,7 @@ func TestVerifyPolicyByBundleCreation_getDirEmpty(t *testing.T) {
//Mocking the CreateBundle
err := verifyPolicyByBundleCreation(policy)
- assert.Error(t, err)
+ assert.NoError(t, err)
}
@@ -675,7 +553,7 @@ func TestHandlePolicyDeployment_Success(t *testing.T) {
{
Properties: model.PolicyProperties{
Data: map[string]string{
- "node.role": "ewogICAgInVzZXJfcm9sZXMiOiB7CiAgICAgICAgImFsaWNlIjogWwogICAgICAgICAgICAiYWRtaW4iCiAgICAgICAgXSwKICAgICAgICAiYm9iIjogWwogICAgICAgICAgICAiZW1wbG95ZWUiLAogICAgICAgICAgICAiYmlsbGluZyIKICAgICAgICBdLAogICAgICAgICJldmUiOiBbCiAgICAgICAgICAgICJjdXN0b21lciIKICAgICAgICBdCiAgICB9LAogICAgInJvbGVfZ3JhbnRzIjogewogICAgICAgICJjdXN0b21lciI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImNhdCIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJhZG9wdCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAiYWRvcHQiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiZW1wbG95ZWUiOiBbCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJjYXQiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJ1cGRhdGUiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiYmlsbGluZyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0KICAgICAgICBdCiAgICB9Cn0K",
+ "node.role": "ewogICAgInVzZXJfcm9sZXMiOiB7CiAgICAgICAgImFsaWNlIjogWwogICAgICAgICAgICAiYWRtaW4iCiAgICAgICAgXSwKICAgICAgICAiYm9iIjogWwogICAgICAgICAgICAiZW1wbG95ZWUiLAogICAgICAgICAgICAiYmlsbGluZyIKICAgICAgICBdLAogICAgICAgICJldmUiOiBbCiAgICAgICAgICAgICJjdXN0b21lciIKICAgICAgICBdCiAgICB9LAogICAgInJvbGVfZ3JhbnRzIjogewogICAgICAgICJjdXN0b21lciI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImNhdCIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJhZG9wdCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAiYWRvcHQiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiZW1wbG95ZWUiOiBbCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJjYXQiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJ1cGRhdGUiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiYmlsbGluZyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0KICAgICAgICBdCiAgICB9Cn0K",
},
Policy: map[string]string{
"role": "ewogICAgInVzZXJfcm9sZXMiOiB7CiAgICAgICAgImFsaWNlIjogWwogICAgICAgICAgICAiYWRtaW4iCiAgICAgICAgXSwKICAgICAgICAiYm9iIjogWwogICAgICAgICAgICAiZW1wbG95ZWUiLAogICAgICAgICAgICAiYmlsbGluZyIKICAgICAgICBdLAogICAgICAgICJldmUiOiBbCiAgICAgICAgICAgICJjdXN0b21lciIKICAgICAgICBdCiAgICB9LAogICAgInJvbGVfZ3JhbnRzIjogewogICAgICAgICJjdXN0b21lciI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImNhdCIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJhZG9wdCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAiYWRvcHQiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiZW1wbG95ZWUiOiBbCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJjYXQiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJ1cGRhdGUiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiYmlsbGluZyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0KICAgICAgICBdCiAgICB9Cn0K",
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler.go b/pkg/kafkacomm/handler/pdp_update_message_handler.go
index 58ee1b0..9268115 100644
--- a/pkg/kafkacomm/handler/pdp_update_message_handler.go
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler.go
@@ -79,7 +79,6 @@ func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error
log.Debugf("PDP_UPDATE Message received: %s", string(message))
pdpattributes.SetPdpSubgroup(pdpUpdate.PdpSubgroup)
- pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs)
if len(pdpUpdate.PoliciesToBeDeployed) > 0 {
failureMessage, successfullyDeployedPolicies := handlePolicyDeploymentVar(pdpUpdate, p)
@@ -131,7 +130,15 @@ func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error
}
}
log.Infof("PDP_STATUS Message Sent Successfully")
- go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
+ log.Debug(pdpUpdate.PdpHeartbeatIntervalMs)
+
+ if pdpattributes.PdpHeartbeatInterval != pdpUpdate.PdpHeartbeatIntervalMs && pdpUpdate.PdpHeartbeatIntervalMs != 0 {
+ //restart the ticker.
+ publisher.StopTicker()
+ pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs)
+ go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
+ }
+
return nil
}
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
index 276cffd..e7b27fd 100644
--- a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
@@ -634,5 +634,5 @@ func TestSendPDPStatusResponse_SimulateFailures(t *testing.T) {
}
-func TestCreateBundleFunc(t *testing.T){
+func TestCreateBundleFunc(t *testing.T) {
}
diff --git a/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go b/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
index 31d4554..4e72619 100644
--- a/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
+++ b/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
@@ -21,6 +21,7 @@ package handler
import (
"context"
+ "encoding/json"
"fmt"
"path/filepath"
"policy-opa-pdp/consts"
@@ -28,38 +29,41 @@ import (
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/metrics"
"policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/model/oapicodegen"
"policy-opa-pdp/pkg/opasdk"
"policy-opa-pdp/pkg/policymap"
"policy-opa-pdp/pkg/utils"
+ "sort"
"strings"
)
type (
HandlePolicyUndeploymentFunc func(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string)
+ opasdkGetDataFunc func(ctx context.Context, dataPath string) (data *oapicodegen.OPADataResponse_Data, err error)
)
var (
- handlePolicyUndeploymentVar HandlePolicyUndeploymentFunc = handlePolicyUndeployment
+ handlePolicyUndeploymentVar HandlePolicyUndeploymentFunc = handlePolicyUndeployment
- removeDirectoryFunc = utils.RemoveDirectory
+ removeDirectoryFunc = utils.RemoveDirectory
- deleteDataSdkFunc = opasdk.DeleteData
+ deleteDataSdkFunc = opasdk.DeleteData
- deletePolicySdkFunc = opasdk.DeletePolicy
+ deletePolicySdkFunc = opasdk.DeletePolicy
- removeDataDirectoryFunc = removeDataDirectory
+ opasdkGetData opasdkGetDataFunc = opasdk.GetDataInfo
- removePolicyDirectoryFunc = removePolicyDirectory
+ removeDataDirectoryFunc = removeDataDirectory
- policyUndeploymentActionFunc = policyUndeploymentAction
+ removePolicyDirectoryFunc = removePolicyDirectory
- removePolicyFromSdkandDirFunc= removePolicyFromSdkandDir
+ policyUndeploymentActionFunc = policyUndeploymentAction
- removeDataFromSdkandDirFunc = removeDataFromSdkandDir
+ removePolicyFromSdkandDirFunc = removePolicyFromSdkandDir
+ removeDataFromSdkandDirFunc = removeDataFromSdkandDir
)
-
// processPoliciesTobeUndeployed handles the undeployment of policies
func processPoliciesTobeUndeployed(undeployedPolicies map[string]string) ([]string, map[string]string) {
var failureMessages []string
@@ -146,10 +150,29 @@ func removeDataFromSdkandDir(policy map[string]interface{}) []string {
var failureMessages []string
if dataKeys, ok := policy["data"].([]interface{}); ok {
+ var dataKeysSlice []string
for _, dataKey := range dataKeys {
- keyPath := "/" + strings.Replace(dataKey.(string), ".", "/", -1)
- log.Debugf("Deleting data from OPA at keypath: %s", keyPath)
- if err := deleteDataSdkFunc(context.Background(), keyPath); err != nil {
+ if strKey, ok := dataKey.(string); ok {
+ dataKeysSlice = append(dataKeysSlice, strKey)
+ } else {
+ failureMessages = append(failureMessages, fmt.Sprintf("Invalid Key :%s", dataKey))
+ }
+ }
+ sort.Sort(utils.ByDotCount{Keys: dataKeysSlice, Ascend: false})
+
+ for _, keyPath := range dataKeysSlice {
+ keyPath = "/" + strings.Replace(keyPath, ".", "/", -1)
+ log.Debugf("Deleting data from OPA : %s", keyPath)
+ // Prepare to handle any errors
+ var err error
+ var dataPath string
+ // Fetch data first
+ // Call the function to check and Analyse empty parent nodes
+ if dataPath, err = analyseEmptyParentNodes(keyPath); err != nil {
+ failureMessages = append(failureMessages, err.Error())
+ }
+ if err := deleteDataSdkFunc(context.Background(), dataPath); err != nil {
+ log.Errorf("Error while deleting Data from SDK for path : %s , %v", keyPath, err.Error())
failureMessages = append(failureMessages, err.Error())
continue
}
@@ -171,6 +194,7 @@ func removePolicyFromSdkandDir(policy map[string]interface{}) []string {
if policyKeys, ok := policy["policy"].([]interface{}); ok {
for _, policyKey := range policyKeys {
keyPath := "/" + strings.Replace(policyKey.(string), ".", "/", -1)
+ log.Debugf("Deleting Policy from OPA : %s", keyPath)
if err := deletePolicySdkFunc(context.Background(), policyKey.(string)); err != nil {
failureMessages = append(failureMessages, err.Error())
continue
@@ -220,3 +244,148 @@ func findDeployedPolicy(policyID, policyVersion string, deployedPolicies []map[s
}
return nil
}
+
+// analyzeEmptyParentNodes constructs the parent path based on the provided dataPath.
+// It checks if any parent nodes become empty after the deletion of the last child key.
+//
+// This function takes a JSON representation of parent data and a data path,
+// splits the path into segments, and determines the eligible paths for deletion.
+//
+// If a parent node has only one child and that child is to be deleted,
+// the full path up to that parent will be returned. If no eligible parents
+// are found by the time it reaches back to the root, the original path will be returned.
+func analyseEmptyParentNodes(dataPath string) (string, error) {
+ log.Debugf("Analyzing dataPath: %s", dataPath)
+ // Split the dataPath into segments
+ pathSegments := strings.Split(dataPath, "/")
+ log.Debugf("Path segments: %+v", pathSegments)
+ // If the path does not have at least 3 segments, treat it as a leaf node
+ if len(pathSegments) < consts.SingleHierarchy {
+ log.Debugf("Path doesn't have any parent-child hierarchy;so returning the original path: %s", dataPath)
+ return dataPath, nil // It's a leaf node or too short; return the original path
+ }
+ // Prepare the parent path which is derived from the second segment
+ parentKeyPath := "/" + pathSegments[1] // Assuming the immediate parent node
+ log.Debugf("Detected parent path: %s", parentKeyPath)
+ // Fetch the data for the detected parent path
+ parentData, err := opasdkGetData(context.Background(), parentKeyPath)
+ if err != nil {
+ return "", fmt.Errorf("failed to get data for parent path %s: %w", parentKeyPath, err)
+ }
+ // Unmarshal parent data JSON into a map for analysis
+ parentDataJson, err := json.Marshal(parentData)
+ if err != nil {
+ return "", fmt.Errorf("failed to marshal parent data: %w", err)
+ }
+ // Call the method to analyze the hierarchy
+ return analyzeHierarchy(parentDataJson, dataPath)
+}
+
+// analyzeHierarchy examines the provided data path against the JSON structure to determine
+// the last eligible path for deletion based on parent-child relationships.
+//
+// The function takes a JSON object in raw format and splits the data path into segments.
+// Starting from the last key, it checks each parent key to see if it has only one child.
+// If so, it marks the path up to that parent as the last eligible path for deletion.
+func analyzeHierarchy(parentDataJson json.RawMessage, dataPath string) (string, error) {
+ // Create a map to hold the parent data
+ parentMap := make(map[string]interface{})
+
+ // Unmarshal the fetched JSON data into the parentMap
+ if err := json.Unmarshal(parentDataJson, &parentMap); err != nil {
+ return "", fmt.Errorf("error unmarshalling parent data: %w", err)
+ }
+
+ // Count keys in the JSON structure
+ countMap := countChildKeysFromJSON(parentMap)
+ // Split keys and omit the first empty element
+ keys := strings.Split(dataPath, "/")[1:]
+ // Default to the input path
+ lastEligible := dataPath
+ // Traverse the path from the last key to the first key
+ // Start from the last segment and stop at the first parent
+ for indexfromKeyPath := len(keys) - 1; indexfromKeyPath >= 1; indexfromKeyPath-- {
+ // Identify the parent of the current path
+ currentPath := strings.Join(keys[:indexfromKeyPath], "/")
+ // Checking counts of the parent key
+ childCount := countMap[currentPath]
+ if childCount == 1 {
+ // If parent has only 1 child after deletion, it is eligible
+ lastEligible = "/" + currentPath // Store the path up to this parent
+ } else {
+ break
+ }
+ }
+
+ log.Debugf("lastEligible Path: %+v", lastEligible)
+ return lastEligible, nil
+
+}
+
+// countChildKeysFromJSON counts the number of child keys for each key in a JSON structure represented as a map.
+//
+// This function traverses the provided JSON map iteratively using a stack, counting
+// the number of direct children for each key. The counts are stored in a map where
+// the keys represent the paths in the JSON hierarchy (using slash notation) and the
+// values indicate how many children each key has.
+// Example Inputs and Outputs:
+//
+// Given the following JSON:
+// {
+// "node": {
+// "collab": {
+// "action": {
+// "conflict": {},
+// "others": {}
+// },
+// "role": {}
+// },
+// "role": {
+// "role_grants": {
+// "billing": {},
+// "shipping": {}
+// }
+// }
+// }
+// }
+// Example Output:
+// {
+// "node": 2,
+// "node/collab": 2,
+// "node/collab/action": 2,
+// "node/collab/role": 0,
+// "node/role": 1,
+// "node/role/role_grants": 2,
+// "node/role/role_grants/billing": 0,
+// "node/role/role_grants/shipping": 0
+// }
+func countChildKeysFromJSON(data map[string]interface{}) map[string]int {
+ countMap := make(map[string]int)
+
+ // Creating a stack for iterative traversal with paths
+ stack := []struct {
+ current map[string]interface{}
+ path string
+ }{
+ {data, "node"}, // Start with the root node path
+ }
+
+ for len(stack) > 0 {
+ // Pop the current map from the stack
+ top := stack[len(stack)-1]
+ stack = stack[:len(stack)-1]
+ for key, value := range top.current {
+ //take the full path
+ currentPath := top.path + "/" + key
+ if childMap, ok := value.(map[string]interface{}); ok {
+ // Count the number of children for each key
+ countMap[currentPath] = len(childMap)
+ stack = append(stack, struct {
+ current map[string]interface{}
+ path string
+ }{childMap, currentPath}) // Push children map into stack with full path
+ }
+ }
+ }
+ return countMap
+}
diff --git a/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go b/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go
index f725f4b..08aed34 100644
--- a/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go
+++ b/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go
@@ -20,13 +20,14 @@
package handler
import (
- // "encoding/json"
"context"
+ "encoding/json"
"errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/model/oapicodegen"
"policy-opa-pdp/pkg/policymap"
"testing"
)
@@ -346,6 +347,18 @@ func TestRemoveDataFromSdkandDir(t *testing.T) {
}()
// Mock removeDataDirectoryFunc and deleteDataFunc to return errors for testing
+ opasdkGetData = func(ctx context.Context, dataPath string) (data *oapicodegen.OPADataResponse_Data, err error) {
+ // Mock JSON data
+ mockedData := `{"mocked": {"success": "value", "error": "value"}}`
+ // Create an instance of OPADataResponse_Data
+ var response oapicodegen.OPADataResponse_Data
+ // Unmarshal into the OPADataResponse_Data struct
+ err = json.Unmarshal([]byte(mockedData), &response)
+ if err != nil {
+ return nil, errors.New("Error unmarshalling")
+ }
+ return &response, nil //
+ }
removeDataDirectoryFunc = func(dataKey string) error {
if dataKey == "/mocked/error" {
return errors.New("mocked remove data directory error")
@@ -370,42 +383,40 @@ func TestRemoveDataFromSdkandDir(t *testing.T) {
assert.Contains(t, failures[0], "mocked delete data error")
}
-
func TestRemovePolicyFromSdkandDir(t *testing.T) {
- // Backup original functions
- originalRemovePolicyDirectory := removePolicyDirectoryFunc
- originalDeletePolicy := deletePolicySdkFunc
- defer func() {
- removePolicyDirectoryFunc = originalRemovePolicyDirectory // Restore after test
- deletePolicySdkFunc = originalDeletePolicy // Restore after test
- }()
-
- // Mock functions
- removePolicyDirectoryFunc = func(policyKey string) error {
- if policyKey == "/mocked/error" {
- return errors.New("mocked remove policy directory error")
- }
- return nil
- }
-
- deletePolicySdkFunc = func(ctx context.Context, policyPath string) error {
- if policyPath == "mocked.error" {
- return errors.New("mocked delete policy error")
- }
- return nil
- }
-
- policy := map[string]interface{}{
- "policy": []interface{}{"mocked.success", "mocked.error"}, // VALID policy key
- }
-
- failures := removePolicyFromSdkandDir(policy)
-
- // Expecting 1 error message (for "mocked.error"), "mocked.success" should pass
- assert.Len(t, failures, 1)
- assert.Contains(t, failures[0], "mocked delete policy error")
-}
+ // Backup original functions
+ originalRemovePolicyDirectory := removePolicyDirectoryFunc
+ originalDeletePolicy := deletePolicySdkFunc
+ defer func() {
+ removePolicyDirectoryFunc = originalRemovePolicyDirectory // Restore after test
+ deletePolicySdkFunc = originalDeletePolicy // Restore after test
+ }()
+
+ // Mock functions
+ removePolicyDirectoryFunc = func(policyKey string) error {
+ if policyKey == "/mocked/error" {
+ return errors.New("mocked remove policy directory error")
+ }
+ return nil
+ }
+
+ deletePolicySdkFunc = func(ctx context.Context, policyPath string) error {
+ if policyPath == "mocked.error" {
+ return errors.New("mocked delete policy error")
+ }
+ return nil
+ }
+ policy := map[string]interface{}{
+ "policy": []interface{}{"mocked.success", "mocked.error"}, // VALID policy key
+ }
+
+ failures := removePolicyFromSdkandDir(policy)
+
+ // Expecting 1 error message (for "mocked.error"), "mocked.success" should pass
+ assert.Len(t, failures, 1)
+ assert.Contains(t, failures[0], "mocked delete policy error")
+}
// Mocking the remove functions
var (