aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/data/data-handler.go179
-rw-r--r--pkg/data/data-handler_test.go50
-rw-r--r--pkg/decision/decision-provider.go154
-rw-r--r--pkg/decision/decision-provider_test.go465
-rw-r--r--pkg/kafkacomm/handler/pdp_update_deploy_policy.go14
-rw-r--r--pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go126
-rw-r--r--pkg/kafkacomm/handler/pdp_update_message_handler.go11
-rw-r--r--pkg/kafkacomm/handler/pdp_update_message_handler_test.go2
-rw-r--r--pkg/kafkacomm/handler/pdp_update_undeploy_policy.go195
-rw-r--r--pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go81
-rw-r--r--pkg/kafkacomm/pdp_topic_consumer.go29
-rw-r--r--pkg/kafkacomm/pdp_topic_consumer_test.go18
-rw-r--r--pkg/kafkacomm/pdp_topic_producer.go8
-rw-r--r--pkg/kafkacomm/pdp_topic_producer_test.go2
-rw-r--r--pkg/kafkacomm/publisher/pdp-heartbeat.go26
-rw-r--r--pkg/kafkacomm/publisher/pdp-heartbeat_test.go44
-rw-r--r--pkg/kafkacomm/publisher/pdp-pap-registration_test.go5
-rw-r--r--pkg/kafkacomm/publisher/pdp-status-publisher_test.go116
-rw-r--r--pkg/metrics/counters.go58
-rw-r--r--pkg/metrics/counters_test.go12
-rw-r--r--pkg/metrics/statistics-provider.go13
-rw-r--r--pkg/metrics/statistics-provider_test.go4
-rw-r--r--pkg/model/messages_test.go4
-rw-r--r--pkg/model/oapicodegen/models.go91
-rw-r--r--pkg/opasdk/opasdk.go19
-rw-r--r--pkg/opasdk/opasdk_test.go8
-rw-r--r--pkg/pdpattributes/pdpattributes.go6
-rw-r--r--pkg/pdpattributes/pdpattributes_test.go8
-rw-r--r--pkg/utils/sort.go41
-rw-r--r--pkg/utils/utils.go215
-rw-r--r--pkg/utils/utils_test.go149
31 files changed, 1436 insertions, 717 deletions
diff --git a/pkg/data/data-handler.go b/pkg/data/data-handler.go
index b571010..673f247 100644
--- a/pkg/data/data-handler.go
+++ b/pkg/data/data-handler.go
@@ -21,8 +21,10 @@ package data
import (
"context"
"encoding/json"
+ "fmt"
"github.com/google/uuid"
openapi_types "github.com/oapi-codegen/runtime/types"
+ "github.com/open-policy-agent/opa/storage"
"net/http"
"path/filepath"
"policy-opa-pdp/consts"
@@ -30,11 +32,9 @@ import (
"policy-opa-pdp/pkg/metrics"
"policy-opa-pdp/pkg/model/oapicodegen"
"policy-opa-pdp/pkg/opasdk"
+ "policy-opa-pdp/pkg/policymap"
"policy-opa-pdp/pkg/utils"
"strings"
-
- "github.com/open-policy-agent/opa/storage"
- "policy-opa-pdp/pkg/policymap"
)
var (
@@ -78,60 +78,30 @@ func createOPADataUpdateExceptionResponse(statusCode int, errorMessage string, p
}
}
-// Validate OPADataUpdateRequest function
-func validateOPADataUpdateRequest(request *oapicodegen.OPADataUpdateRequest) []string {
- var validationErrors []string
-
- // Check if required fields are populated
- dateString := (request.CurrentDate).String()
- if !(utils.IsValidCurrentDate(&dateString)) {
- validationErrors = append(validationErrors, "CurrentDate is required")
- }
-
- // Validate CurrentDateTime format
- if !(utils.IsValidTime(request.CurrentDateTime)) {
- validationErrors = append(validationErrors, "CurrentDateTime is invalid or missing")
- }
-
- // Validate CurrentTime format
- if !(utils.IsValidCurrentTime(request.CurrentTime)) {
- validationErrors = append(validationErrors, "CurrentTime is invalid or missing")
- }
-
- // Validate Data field (ensure it's not nil and has items)
- if !(utils.IsValidData(request.Data)) {
- validationErrors = append(validationErrors, "Data is required and cannot be empty")
- }
-
- // Validate TimeOffset format (e.g., +02:00 or -05:00)
- if !(utils.IsValidTimeOffset(request.TimeOffset)) {
- validationErrors = append(validationErrors, "TimeOffset is invalid or missing")
- }
-
- // Validate TimeZone format (e.g., 'America/New_York')
- if !(utils.IsValidTimeZone(request.TimeZone)) {
- validationErrors = append(validationErrors, "TimeZone is invalid or missing")
- }
-
- // Optionally, check if 'OnapComponent', 'OnapInstance', 'OnapName', and 'PolicyName' are provided
- if !(utils.IsValidString(request.OnapComponent)) {
- validationErrors = append(validationErrors, "OnapComponent is required")
- }
+type Policy struct {
+ Data []string `json:"data"`
+ Policy []string `json:"policy"`
+ PolicyID string `json:"policy-id"`
+ PolicyVersion string `json:"policy-version"`
+}
- if !(utils.IsValidString(request.OnapInstance)) {
- validationErrors = append(validationErrors, "OnapInstance is required")
+// Function to extract the policy by policyId
+func getPolicyByID(policiesMap string, policyId string) (*Policy, error) {
+ var policies struct {
+ DeployedPolicies []Policy `json:"deployed_policies_dict"`
}
- if !(utils.IsValidString(request.OnapName)) {
- validationErrors = append(validationErrors, "OnapName is required")
+ if err := json.Unmarshal([]byte(policiesMap), &policies); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal policies: %v", err)
}
- if !(utils.IsValidString(request.PolicyName)) {
- validationErrors = append(validationErrors, "PolicyName is required and cannot be empty")
+ for _, policy := range policies.DeployedPolicies {
+ if policy.PolicyID == policyId {
+ return &policy, nil
+ }
}
- // Return all validation errors (if any)
- return validationErrors
+ return nil, fmt.Errorf("policy '%s' not found", policyId)
}
func patchHandler(res http.ResponseWriter, req *http.Request) {
@@ -144,13 +114,18 @@ func patchHandler(res http.ResponseWriter, req *http.Request) {
log.Errorf(errMsg)
return
}
- path := strings.TrimPrefix(req.URL.Path, "/policy/pdpo/v1/data/")
+ path := strings.TrimPrefix(req.URL.Path, "/policy/pdpo/v1/data")
dirParts := strings.Split(path, "/")
dataDir := filepath.Join(dirParts...)
log.Infof("dataDir : %s", dataDir)
// Validate the request
- validationErrors := validateOPADataUpdateRequest(&requestBody)
+ validationErrors := utils.ValidateOPADataRequest(&requestBody)
+
+ // Validate Data field (ensure it's not nil and has items)
+ if !(utils.IsValidData(requestBody.Data)) {
+ validationErrors = append(validationErrors, "Data is required and cannot be empty")
+ }
// Print validation errors
if len(validationErrors) > 0 {
@@ -159,7 +134,7 @@ func patchHandler(res http.ResponseWriter, req *http.Request) {
sendErrorResponse(res, errMsg, http.StatusBadRequest)
return
} else {
- log.Errorf("All fields are valid!")
+ log.Debug("All fields are valid!")
// Access the data part
data := requestBody.Data
log.Infof("data : %s", data)
@@ -172,10 +147,46 @@ func patchHandler(res http.ResponseWriter, req *http.Request) {
log.Errorf(errMsg)
return
}
+
+ // Checking if the data operation is performed for a deployed policy with policymap.CheckIfPolicyAlreadyExists and getPolicyByID
+ // if a match is found, we will join the url path with dots and check for the data key from the policiesMap whether utl path is a
+ // prefix of data key. we will proceed for Patch Operation if this matches, else return error
+ if len(dirParts) > 0 && dirParts[0] == "" {
+ dirParts = dirParts[1:]
+ }
+ finalDirParts := strings.Join(dirParts, ".")
+
+ policiesMap := policymap.LastDeployedPolicies
+
+ matchedPolicy, err := getPolicyByID(policiesMap, *policyId)
+ if err != nil {
+ sendErrorResponse(res, err.Error(), http.StatusBadRequest)
+ log.Errorf(err.Error())
+ return
+ }
+
+ log.Infof("Matched policy: %+v", matchedPolicy)
+
+ // Check if finalDirParts starts with any data key
+ matchFound := false
+ for _, dataKey := range matchedPolicy.Data {
+ if strings.HasPrefix(finalDirParts, dataKey) {
+ matchFound = true
+ break
+ }
+ }
+
+ if !matchFound {
+ errMsg := fmt.Sprintf("Dynamic Data add/replace/remove for policy '%s' expected under url path '%v'", *policyId, matchedPolicy.Data)
+ sendErrorResponse(res, errMsg, http.StatusBadRequest)
+ log.Errorf(errMsg)
+ return
+ }
+
if err := patchData(dataDir, data, res); err != nil {
- // Handle the error, for example, log it or return an appropriate response
- log.Errorf("Error encoding JSON response: %s", err)
- }
+ // Handle the error, for example, log it or return an appropriate response
+ log.Errorf("Error encoding JSON response: %s", err)
+ }
}
}
@@ -215,7 +226,7 @@ func extractPatchInfo(res http.ResponseWriter, ops *[]map[string]interface{}, ro
// PATCH request with add or replace opType, MUST contain a "value" member whose content specifies the value to be added / replaced. For remove opType, value does not required
if optypeString == "add" || optypeString == "replace" {
value, valueErr = op["value"]
- if !valueErr {
+ if !valueErr || isEmpty(value) {
valueErrMsg := "Error in getting data value. Value is not given in request body"
sendErrorResponse(res, valueErrMsg, http.StatusInternalServerError)
log.Errorf(valueErrMsg)
@@ -225,7 +236,7 @@ func extractPatchInfo(res http.ResponseWriter, ops *[]map[string]interface{}, ro
impl.Value = value
opPath, opPathErr := op["path"].(string)
- if !opPathErr {
+ if !opPathErr || len(opPath) == 0 {
opPathErrMsg := "Error in getting data path. Path is not given in request body"
sendErrorResponse(res, opPathErrMsg, http.StatusInternalServerError)
log.Errorf(opPathErrMsg)
@@ -243,6 +254,35 @@ func extractPatchInfo(res http.ResponseWriter, ops *[]map[string]interface{}, ro
return result
}
+func isEmpty(data interface{}) bool {
+ if data == nil {
+ return true // Nil values are considered empty
+ }
+
+ switch v := data.(type) {
+ case string:
+ return len(v) == 0 // Check if string is empty
+ case []interface{}:
+ return len(v) == 0 // Check if slice is empty
+ case map[string]interface{}:
+ return len(v) == 0 // Check if map is empty
+ case []byte:
+ return len(v) == 0 // Check if byte slice is empty
+ case int, int8, int16, int32, int64:
+ return v == 0 // Zero integers are considered empty
+ case uint, uint8, uint16, uint32, uint64:
+ return v == 0 // Zero unsigned integers are considered empty
+ case float32, float64:
+ return v == 0.0 // Zero floats are considered empty
+ case bool:
+ return !v // `false` is considered empty
+ case nil:
+ return true // Explicitly checking nil again
+ default:
+ return false // Other data types are not considered empty
+ }
+}
+
func constructPath(opPath string, opType string, root string, res http.ResponseWriter) (storagePath storage.Path) {
// Construct patch path.
log.Debugf("root: %s", root)
@@ -269,13 +309,10 @@ func constructPath(opPath string, opType string, root string, res http.ResponseW
path = root + "/" + path
}
} else {
- if opType == "remove" {
- valueErrMsg := "Error in getting data path - Invalid path (/) is used."
- sendErrorResponse(res, valueErrMsg, http.StatusInternalServerError)
- log.Errorf(valueErrMsg)
- return nil
- }
- path = root
+ valueErrMsg := "Error in getting data path - Invalid path (/) is used."
+ sendErrorResponse(res, valueErrMsg, http.StatusInternalServerError)
+ log.Errorf(valueErrMsg)
+ return nil
}
log.Infof("calling ParsePatchPathEscaped to check the path")
@@ -386,7 +423,13 @@ func getDataInfo(res http.ResponseWriter, req *http.Request) {
constructResponseHeader(res, req)
urlPath := req.URL.Path
- dataPath := strings.ReplaceAll(urlPath, "/policy/pdpo/v1/data", "")
+
+ dataPath := strings.TrimPrefix(urlPath, "/policy/pdpo/v1/data")
+
+ if len(strings.TrimSpace(dataPath)) == 0 {
+ // dataPath "/" is used to get entire data
+ dataPath = "/"
+ }
log.Debugf("datapath to get Data : %s\n", dataPath)
getData(res, dataPath)
@@ -420,7 +463,7 @@ func getData(res http.ResponseWriter, dataPath string) {
res.WriteHeader(http.StatusOK)
if err := json.NewEncoder(res).Encode(dataResponse); err != nil {
- // Handle the error, for example, log it or return an appropriate response
- log.Errorf("Error encoding JSON response: %s", err)
+ // Handle the error, for example, log it or return an appropriate response
+ log.Errorf("Error encoding JSON response: %s", err)
}
}
diff --git a/pkg/data/data-handler_test.go b/pkg/data/data-handler_test.go
index 41be361..e8dcb17 100644
--- a/pkg/data/data-handler_test.go
+++ b/pkg/data/data-handler_test.go
@@ -56,56 +56,6 @@ func TestGetErrorResponseCodeForOPADataUpdate(t *testing.T) {
}
}
-func TestValidateOPADataUpdateRequest(t *testing.T) {
- ctime := "12:00:00"
- timeZone := "America_New_York"
- timeOffset := "$02:00"
- onapComp := " "
- onapIns := " "
- onapName := " "
- policyName := " "
- var currentDate openapi_types.Date
- currentDate = openapi_types.Date{}
- var currentDateTime time.Time
- currentDateTime = time.Time{}
-
- var data []map[string]interface{}
-
- data = nil
-
- inValidRequest := &oapicodegen.OPADataUpdateRequest{
- CurrentDate: &currentDate,
- CurrentDateTime: &currentDateTime,
- CurrentTime: &ctime,
- TimeOffset: &timeOffset,
- TimeZone: &timeZone,
- OnapComponent: &onapComp,
- OnapInstance: &onapIns,
- OnapName: &onapName,
- PolicyName: &policyName,
- Data: &data,
- }
-
- inValidErr := []string{"CurrentTime is invalid or missing", "Data is required and cannot be empty", "TimeOffset is invalid or missing", "TimeZone is invalid or missing", "OnapComponent is required", "OnapInstance is required", "OnapName is required", "PolicyName is required and cannot be empty"}
-
- tests := []struct {
- name string
- request *oapicodegen.OPADataUpdateRequest
- expectedErr []string
- }{
- {"Valid Request", inValidRequest, inValidErr},
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- errors := validateOPADataUpdateRequest(tt.request)
- fmt.Printf("error : %s", errors)
- fmt.Printf("error len : %d", len(errors))
- assert.Equal(t, tt.expectedErr, errors)
- })
- }
-}
-
func TestPatchHandler_InvalidJSON(t *testing.T) {
req := httptest.NewRequest("PATCH", "/policy/pdpo/v1/data/", bytes.NewBuffer([]byte("{invalid_json}")))
res := httptest.NewRecorder()
diff --git a/pkg/decision/decision-provider.go b/pkg/decision/decision-provider.go
index 12896c3..035fb0f 100644
--- a/pkg/decision/decision-provider.go
+++ b/pkg/decision/decision-provider.go
@@ -38,6 +38,7 @@ import (
"policy-opa-pdp/pkg/pdpstate"
"policy-opa-pdp/pkg/policymap"
"policy-opa-pdp/pkg/utils"
+ "sort"
"strings"
)
@@ -77,8 +78,8 @@ func writeErrorJSONResponse(res http.ResponseWriter, status int, errorDescriptio
// creates a success decision response
func createSuccessDecisionResponse(policyName string, output map[string]interface{}) *oapicodegen.OPADecisionResponse {
return &oapicodegen.OPADecisionResponse{
- PolicyName: policyName,
- Output: output,
+ PolicyName: policyName,
+ Output: output,
}
}
@@ -135,17 +136,21 @@ func handleDecisionRequest(res http.ResponseWriter, req *http.Request, errorDtls
return
}
- if decisionReq.PolicyName == "" {
- *errorDtls = "Policy Name is nil which is invalid."
- *httpStatus = http.StatusBadRequest
- return
- }
+ // Validate the request body
+ validationErrors := utils.ValidateOPADataRequest(decisionReq)
if decisionReq.PolicyFilter == nil || len(decisionReq.PolicyFilter) == 0 {
- *errorDtls = "Policy Filter is nil."
+ validationErrors = append(validationErrors, "PolicyFilter is required")
+ }
+ if len(validationErrors) > 0 {
+ *errorDtls = strings.Join(validationErrors, ", ")
+ log.Errorf("Facing validation error in requestbody - %s", *errorDtls)
*httpStatus = http.StatusBadRequest
return
}
+ log.Debugf("Validation successful for request fields")
+ // If validation passes, handle the decision request
+
decisionReq.PolicyName = strings.ReplaceAll(decisionReq.PolicyName, ".", "/")
handlePolicyValidation(res, decisionReq, errorDtls, httpStatus, policyId)
}
@@ -195,7 +200,7 @@ func policyExists(policyName string, extractedPolicies []model.ToscaConceptIdent
return false
}
-//This function processes the request headers
+// This function processes the request headers
func processRequestHeaders(req *http.Request, res http.ResponseWriter) (string, *oapicodegen.DecisionParams) {
requestId := req.Header.Get("X-ONAP-RequestID")
var parsedUUID *uuid.UUID
@@ -229,7 +234,7 @@ func isSystemActive() bool {
return pdpstate.GetCurrentState() == model.Active
}
-//This method parses the body and checks whether it is properly formatted JSON or not
+// This method parses the body and checks whether it is properly formatted JSON or not
func parseRequestBody(req *http.Request) (*oapicodegen.OPADecisionRequest, error) {
var decisionReq oapicodegen.OPADecisionRequest
if err := json.NewDecoder(req.Body).Decode(&decisionReq); err != nil {
@@ -238,7 +243,7 @@ func parseRequestBody(req *http.Request) (*oapicodegen.OPADecisionRequest, error
return &decisionReq, nil
}
-//This function sends the error response
+// This function sends the error response
func sendDecisionErrorResponse(msg string, res http.ResponseWriter, httpStatus int, policyName string) {
log.Warnf("%s", msg)
decisionExc := createDecisionExceptionResponse(httpStatus, msg, policyName)
@@ -247,29 +252,33 @@ func sendDecisionErrorResponse(msg string, res http.ResponseWriter, httpStatus i
writeErrorJSONResponse(res, httpStatus, msg, *decisionExc)
}
-
type OPASingletonInstanceFunc func() (*sdk.OPA, error)
+
var OPASingletonInstance OPASingletonInstanceFunc = opasdk.GetOPASingletonInstance
-//This function returns the opasdk instance
+// This function returns the opasdk instance
func getOpaInstance() (*sdk.OPA, error) {
return OPASingletonInstance()
}
-
-
type OPADecisionFunc func(opa *sdk.OPA, ctx context.Context, options sdk.DecisionOptions) (*sdk.DecisionResult, error)
+
var OPADecision OPADecisionFunc = (*sdk.OPA).Decision
-//This function processes the OPA decision
+// This function processes the OPA decision
func processOpaDecision(res http.ResponseWriter, opa *sdk.OPA, decisionReq *oapicodegen.OPADecisionRequest) {
ctx := context.Background()
log.Debugf("SDK making a decision")
- var decisionRes *oapicodegen.OPADecisionResponse
+ var decisionRes *oapicodegen.OPADecisionResponse
//OPA is seding success with a warning message if "input" parameter is missing, so we need to send success response
- if (decisionReq.Input == nil) {
- statusMessage := "{\"warning\":{\"code\":\"api_usage_warning\",\"message\":\"'input' key missing from the request\"}}"
- decisionRes = createSuccessDecisionResponseWithStatus(decisionReq.PolicyName, nil, statusMessage)
+ inputBytes, err := json.Marshal(decisionReq.Input)
+ if err != nil {
+ log.Warnf("Failed to unmarshal decision Request Input: %vg", err)
+ return
+ }
+ if inputBytes == nil || len(inputBytes) == 0 {
+ statusMessage := "{\"warning\":{\"code\":\"api_usage_warning\",\"message\":\"'input' key missing from the request\"}}"
+ decisionRes = createSuccessDecisionResponseWithStatus(decisionReq.PolicyName, nil, statusMessage)
} else {
options := sdk.DecisionOptions{Path: decisionReq.PolicyName, Input: decisionReq.Input}
decisionResult, decisionErr := OPADecision(opa, ctx, options)
@@ -280,20 +289,22 @@ func processOpaDecision(res http.ResponseWriter, opa *sdk.OPA, decisionReq *oapi
}
log.Debugf("RAW opa Decision output:\n%s\n", string(jsonOutput))
+ //while making decision . is replaced by /. reverting back.
+ decisionReq.PolicyName = strings.ReplaceAll(decisionReq.PolicyName, "/", ".")
+
if decisionErr != nil {
- handleOpaDecisionError(res, decisionErr, decisionReq.PolicyName)
+ sendDecisionErrorResponse(decisionErr.Error(), res, http.StatusInternalServerError, decisionReq.PolicyName)
return
}
-
var policyFilter []string
if decisionReq.PolicyFilter != nil {
policyFilter = decisionReq.PolicyFilter
}
result, _ := decisionResult.Result.(map[string]interface{})
- outputMap, unmatchedFilters := processPolicyFilter(result, policyFilter)
+ outputMap, unmatchedFilters, validPolicyFilters := processPolicyFilter(result, policyFilter)
if len(unmatchedFilters) > 0 {
- message := fmt.Sprintf("Policy Filter(s) not matching: [%s]", strings.Join(unmatchedFilters, ", "))
+ message := fmt.Sprintf("Policy Filter(s) not matching, Valid Filter(s) are: [%s]", strings.Join(validPolicyFilters, ", "))
decisionRes = createSuccessDecisionResponseWithStatus(decisionReq.PolicyName, outputMap, message)
} else {
decisionRes = createSuccessDecisionResponse(decisionReq.PolicyName, outputMap)
@@ -303,52 +314,77 @@ func processOpaDecision(res http.ResponseWriter, opa *sdk.OPA, decisionReq *oapi
writeOpaJSONResponse(res, http.StatusOK, *decisionRes)
}
-//This function validates the errors during decision process
-func handleOpaDecisionError(res http.ResponseWriter, err error, policyName string) {
- //As per the opa documentation in https://www.openpolicyagent.org/docs/latest/rest-api/#get-a-document-with-input
- //when the path refers to an undefined document it will return 200 with no result.
- //opasdk is returning opa_undefined_error for such case, so need to give sucess for such case and
- //for other cases we have to send error response
- if strings.Contains(err.Error(), string(oapicodegen.OpaUndefinedError)) {
- decisionExc := createSuccessDecisionResponse(policyName, nil)
- metrics.IncrementDecisionSuccessCount()
- writeOpaJSONResponse(res, http.StatusOK, *decisionExc)
- } else {
- sendDecisionErrorResponse(err.Error(), res, http.StatusInternalServerError, policyName)
- }
-}
-
-//This function processes the policy filters
-func processPolicyFilter(result map[string]interface{}, policyFilter []string) (map[string]interface{}, []string) {
+// This function processes the policy filters
+func processPolicyFilter(result map[string]interface{}, policyFilter []string) (map[string]interface{}, []string, []string) {
if len(policyFilter) > 0 {
- filteredResult, unmatchedFilters := applyPolicyFilter(result, policyFilter)
+ filteredResult, unmatchedFilters, validfilters := applyPolicyFilter(result, policyFilter)
if len(filteredResult) > 0 {
- return filteredResult, unmatchedFilters
+ return filteredResult, unmatchedFilters, validfilters
}
}
- return nil, policyFilter
+ return nil, policyFilter, getValidPolicyFilters(result)
}
-// Function to apply policy filter to decision result
-func applyPolicyFilter(result map[string]interface{}, filters []string) (map[string]interface{}, []string) {
+// Get Valid Filters and collects unmatched filters
+func applyPolicyFilter(result map[string]interface{}, filters []string) (map[string]interface{}, []string, []string) {
filteredOutput := make(map[string]interface{})
- unmatchedFilters := make(map[string]struct{})
+ unmatchedFilters := []string{}
+
+ validFilters := getValidPolicyFilters(result)
for _, filter := range filters {
- unmatchedFilters[filter] = struct{}{}
+ if filter == "" {
+ // when filter is "" empty, the entire resultant data will be reported
+ return result, nil, validFilters
+ }
+ // Try to find the value in the result map
+ if value := findNestedValue(result, strings.Split(filter, "/")); value != nil {
+ filteredOutput[filter] = value // Store using full path
+ } else if value, exists := result[filter]; exists {
+ // Allow direct key match (for non-nested filters)
+ filteredOutput[filter] = value
+ } else {
+ unmatchedFilters = append(unmatchedFilters, filter) // Collect unmatched filters
+ }
}
- for key, value := range result {
- for _, filter := range filters {
- if (key == filter || strings.TrimSpace(filter) == "") {
- filteredOutput[key] = value
- delete(unmatchedFilters, filter)
- }
- }
+
+ return filteredOutput, unmatchedFilters, validFilters
+}
+
+// handles the nested Policy Filters available when multiple rego files are included.
+func findNestedValue(opaSdkResult map[string]interface{}, keys []string) interface{} {
+ if len(keys) == 0 {
+ return nil
}
+ currentMap := opaSdkResult
- unmatchedList := make([]string, 0, len(unmatchedFilters))
- for filter := range unmatchedFilters {
- unmatchedList = append(unmatchedList, filter)
+ for _, key := range keys {
+ value, exists := currentMap[key]
+ if !exists {
+ return nil // Key doesn't exist
+ }
+
+ // If it's a nested map, continue traversal
+ if nextNestedMap, ok := value.(map[string]interface{}); ok {
+ currentMap = nextNestedMap
+ } else {
+ return value // Return final value (non-map)
+ }
}
+ return currentMap
+}
- return filteredOutput, unmatchedList
+// returns the valid Policy Filters available
+func getValidPolicyFilters(opaSdkResult map[string]interface{}) []string {
+ keys := make([]string, 0)
+
+ for k, v := range opaSdkResult {
+ keys = append(keys, k)
+ if nestedMap, ok := v.(map[string]interface{}); ok {
+ for nestedKey := range nestedMap {
+ keys = append(keys, k+"/"+nestedKey)
+ }
+ }
+ }
+ sort.Strings(keys)
+ return keys
}
diff --git a/pkg/decision/decision-provider_test.go b/pkg/decision/decision-provider_test.go
index ad95522..387d07a 100644
--- a/pkg/decision/decision-provider_test.go
+++ b/pkg/decision/decision-provider_test.go
@@ -25,7 +25,9 @@ import (
"encoding/json"
"errors"
"fmt"
+ openapi_types "github.com/oapi-codegen/runtime/types"
"github.com/open-policy-agent/opa/sdk"
+ "github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"os"
@@ -35,10 +37,10 @@ import (
"policy-opa-pdp/pkg/pdpstate"
"policy-opa-pdp/pkg/policymap"
"testing"
- "github.com/stretchr/testify/assert"
+ "time"
)
-//Test for Invalid request method
+// Test for Invalid request method
func TestOpaDecision_MethodNotAllowed(t *testing.T) {
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
@@ -54,7 +56,7 @@ func TestOpaDecision_MethodNotAllowed(t *testing.T) {
assert.Contains(t, rec.Body.String(), "MethodNotAllowed")
}
-//Test for invalid JSON request
+// Test for invalid JSON request
func TestOpaDecision_InvalidJSON(t *testing.T) {
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
@@ -69,53 +71,135 @@ func TestOpaDecision_InvalidJSON(t *testing.T) {
assert.Equal(t, http.StatusBadRequest, rec.Code)
}
-//Test for Missing Policy
+// Test for Missing Policy
func TestOpaDecision_MissingPolicyPath(t *testing.T) {
+ ctime := "08:26:41.857Z"
+ timeZone := "America/New_York"
+ timeOffset := "+02:00"
+ onapComp := "COMPONENT"
+ onapIns := "INSTANCE"
+ onapName := "ONAP"
+ policyFilter := []string{"filter1", "filter2"}
+ parsedDate, err := time.Parse("2006-01-02", "2024-02-12")
+ if err != nil {
+ fmt.Println("error in parsedDate")
+ }
+ currentDate := openapi_types.Date{Time: parsedDate}
+ currentDateTime, err := time.Parse(time.RFC3339, "2024-02-12T12:00:00Z")
+ if err != nil {
+ fmt.Println("error in currentDateTime")
+ }
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- body := map[string]interface{}{"onapName": "CDS", "onapComponent": "CDS", "onapInstance": "CDS", "requestId": "8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1", "input": nil}
- jsonBody, _ := json.Marshal(body)
+ validRequest := &oapicodegen.OPADecisionRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ CurrentTime: &ctime,
+ TimeOffset: &timeOffset,
+ TimeZone: &timeZone,
+ OnapComponent: &onapComp,
+ OnapInstance: &onapIns,
+ OnapName: &onapName,
+ PolicyFilter: policyFilter,
+ }
+
+ jsonBody, _ := json.Marshal(validRequest)
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBuffer(jsonBody))
rec := httptest.NewRecorder()
OpaDecision(rec, req)
assert.Equal(t, http.StatusBadRequest, rec.Code)
- assert.Contains(t, rec.Body.String(), "Policy Name is nil which is invalid")
+ assert.Contains(t, rec.Body.String(), "PolicyName is required and cannot be empty")
}
-//Test for Missing Policy Filter
+// Test for Missing Policy Filter
func TestOpaDecision_MissingPolicyFilter(t *testing.T) {
+ ctime := "08:26:41.857Z"
+ timeZone := "America/New_York"
+ timeOffset := "+02:00"
+ onapComp := "COMPONENT"
+ onapIns := "INSTANCE"
+ onapName := "ONAP"
+ policyName := "ONAP"
+ parsedDate, err := time.Parse("2006-01-02", "2024-02-12")
+ if err != nil {
+ fmt.Println("error in parsedDate")
+ }
+ currentDate := openapi_types.Date{Time: parsedDate}
+ currentDateTime, err := time.Parse(time.RFC3339, "2024-02-12T12:00:00Z")
+ if err != nil {
+ fmt.Println("error in currentDateTime")
+ }
+
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- body := map[string]interface{}{"onapName": "CDS", "policyName": "datapolicy", "onapComponent": "CDS", "onapInstance": "CDS", "requestId": "8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1", "input": nil}
-
- jsonBody, _ := json.Marshal(body)
+ validRequest := &oapicodegen.OPADecisionRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ CurrentTime: &ctime,
+ TimeOffset: &timeOffset,
+ TimeZone: &timeZone,
+ OnapComponent: &onapComp,
+ OnapInstance: &onapIns,
+ OnapName: &onapName,
+ PolicyName: policyName,
+ }
+ jsonBody, _ := json.Marshal(validRequest)
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBuffer(jsonBody))
rec := httptest.NewRecorder()
OpaDecision(rec, req)
assert.Equal(t, http.StatusBadRequest, rec.Code)
- assert.Contains(t, rec.Body.String(), "Policy Filter is nil")
+ assert.Contains(t, rec.Body.String(), "PolicyFilter is required")
}
-//Test for OPA Instance Error
+// Test for OPA Instance Error
func TestOpaDecision_GetInstanceError(t *testing.T) {
+ ctime := "08:26:41.857Z"
+ timeZone := "America/New_York"
+ timeOffset := "+02:00"
+ onapComp := "COMPONENT"
+ onapIns := "INSTANCE"
+ onapName := "ONAP"
+ policyName := "data.policy"
+ policyFilter := []string{"filter1", "filter2"}
+ parsedDate, err := time.Parse("2006-01-02", "2024-02-12")
+ if err != nil {
+ fmt.Println("error in parsedDate")
+ }
+ currentDate := openapi_types.Date{Time: parsedDate}
+ currentDateTime, err := time.Parse(time.RFC3339, "2024-02-12T12:00:00Z")
+ if err != nil {
+ fmt.Println("error in currentDateTime")
+ }
+
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- body := map[string]interface{}{"policy": "data.policy"}
- jsonBody, _ := json.Marshal(body)
+ validRequest := &oapicodegen.OPADecisionRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ CurrentTime: &ctime,
+ TimeOffset: &timeOffset,
+ TimeZone: &timeZone,
+ OnapComponent: &onapComp,
+ OnapInstance: &onapIns,
+ OnapName: &onapName,
+ PolicyName: policyName,
+ PolicyFilter: policyFilter,
+ }
+ jsonBody, _ := json.Marshal(validRequest)
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBuffer(jsonBody))
rec := httptest.NewRecorder()
@@ -124,15 +208,44 @@ func TestOpaDecision_GetInstanceError(t *testing.T) {
assert.Equal(t, http.StatusBadRequest, rec.Code)
}
-//Test for OPA decision Error
+// Test for OPA decision Error
func TestOpaDecision_OPADecisionError(t *testing.T) {
+ ctime := "08:26:41.857Z"
+ timeZone := "America/New_York"
+ timeOffset := "+02:00"
+ onapComp := "COMPONENT"
+ onapIns := "INSTANCE"
+ onapName := "ONAP"
+ policyName := "data.policy"
+ policyFilter := []string{"filter1", "filter2"}
+ parsedDate, err := time.Parse("2006-01-02", "2024-02-12")
+ if err != nil {
+ fmt.Println("error in parsedDate")
+ }
+ currentDate := openapi_types.Date{Time: parsedDate}
+ currentDateTime, err := time.Parse(time.RFC3339, "2024-02-12T12:00:00Z")
+ if err != nil {
+ fmt.Println("error in currentDateTime")
+ }
+
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- body := map[string]interface{}{"policy": "data.policy"}
- jsonBody, _ := json.Marshal(body)
+ validRequest := &oapicodegen.OPADecisionRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ CurrentTime: &ctime,
+ TimeOffset: &timeOffset,
+ TimeZone: &timeZone,
+ OnapComponent: &onapComp,
+ OnapInstance: &onapIns,
+ OnapName: &onapName,
+ PolicyName: policyName,
+ PolicyFilter: policyFilter,
+ }
+ jsonBody, _ := json.Marshal(validRequest)
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBuffer(jsonBody))
rec := httptest.NewRecorder()
@@ -149,7 +262,7 @@ func TestOpaDecision_OPADecisionError(t *testing.T) {
assert.Equal(t, http.StatusBadRequest, rec.Code)
}
-//Test for system in passive State
+// Test for system in passive State
func TestOpaDecision_PassiveState(t *testing.T) {
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
@@ -174,6 +287,7 @@ func ptrString(s string) string {
func ptrStringEx(s string) *string {
return &s
}
+
// Utility function to return a pointer to a map
func ptrMap(m map[string]interface{}) map[string]interface{} {
return m
@@ -184,8 +298,8 @@ func TestWriteOpaJSONResponse(t *testing.T) {
rec := httptest.NewRecorder()
data := &oapicodegen.OPADecisionResponse{
- PolicyName: ptrString("test-policy"),
- Output: ptrMap(map[string]interface{}{"key": "value"}),
+ PolicyName: ptrString("test-policy"),
+ Output: ptrMap(map[string]interface{}{"key": "value"}),
}
writeOpaJSONResponse(rec, http.StatusOK, *data)
@@ -194,7 +308,7 @@ func TestWriteOpaJSONResponse(t *testing.T) {
assert.Contains(t, rec.Body.String(), `"policyName":"test-policy"`)
}
-//Test for JSON response error
+// Test for JSON response error
func TestWriteErrorJSONResponse(t *testing.T) {
rec := httptest.NewRecorder()
@@ -209,7 +323,7 @@ func TestWriteErrorJSONResponse(t *testing.T) {
assert.Contains(t, rec.Body.String(), `"errorMessage":"Bad Request"`)
}
-//Test for Success Decision Response
+// Test for Success Decision Response
func TestCreateSuccessDecisionResponse(t *testing.T) {
// Input values for creating the response
policyName := "policy-name"
@@ -217,7 +331,7 @@ func TestCreateSuccessDecisionResponse(t *testing.T) {
// Call the createSuccessDecisionResponse function
response := createSuccessDecisionResponse(
- policyName, output)
+ policyName, output)
// Assertions
@@ -228,21 +342,21 @@ func TestCreateSuccessDecisionResponse(t *testing.T) {
assert.Equal(t, response.Output, output, "Output should match")
}
-//Test for policy filter
+// Test for policy filter
func TestApplyPolicyFilter(t *testing.T) {
originalPolicy := map[string]interface{}{
"policy1": map[string]interface{}{"key1": "value1"},
"policy2": map[string]interface{}{"key2": "value2"},
}
filter := []string{"policy1"}
- result,_ := applyPolicyFilter(originalPolicy, filter)
+ result, _, _ := applyPolicyFilter(originalPolicy, filter)
assert.NotNil(t, result)
assert.Len(t, result, 1)
assert.Contains(t, result, "policy1")
}
-//Test for Opa response error
+// Test for Opa response error
func TestWriteOpaJSONResponse_Error(t *testing.T) {
rec := httptest.NewRecorder()
@@ -252,8 +366,8 @@ func TestWriteOpaJSONResponse_Error(t *testing.T) {
// Create a response object for error scenario
data := &oapicodegen.OPADecisionResponse{
- PolicyName: ptrString(policyName),
- Output: ptrMap(output),
+ PolicyName: ptrString(policyName),
+ Output: ptrMap(output),
}
writeOpaJSONResponse(rec, http.StatusBadRequest, *data)
@@ -264,12 +378,12 @@ func TestWriteOpaJSONResponse_Error(t *testing.T) {
assert.Contains(t, rec.Body.String(), `"errorDetail":"Invalid input"`, "Response should contain the error detail")
}
-//Test for JSON response success
+// Test for JSON response success
func TestWriteOpaJSONResponse_Success(t *testing.T) {
// Prepare test data
decisionRes := oapicodegen.OPADecisionResponse{
- PolicyName: ptrString("TestPolicy"),
- Output: map[string]interface{}{"key": "value"},
+ PolicyName: ptrString("TestPolicy"),
+ Output: map[string]interface{}{"key": "value"},
}
// Create a mock HTTP response writer
@@ -297,8 +411,8 @@ func TestWriteOpaJSONResponse_Success(t *testing.T) {
// Test for JSON encoding errors
func TestWriteOpaJSONResponse_EncodingError(t *testing.T) {
- // Prepare invalid test data to trigger JSON encoding error
- decisionRes := oapicodegen.OPADecisionResponse {
+ // Prepare invalid test data to trigger JSON encoding error
+ decisionRes := oapicodegen.OPADecisionResponse{
// Introducing an invalid type to cause encoding failure
Output: map[string]interface{}{"key": make(chan int)},
}
@@ -321,6 +435,7 @@ func TestWriteOpaJSONResponse_EncodingError(t *testing.T) {
}
// Mocks for test cases
+//var GetOPASingletonInstance = opasdk.GetOPASingletonInstance
var mockDecisionResult = &sdk.DecisionResult{
Result: map[string]interface{}{
@@ -352,12 +467,12 @@ var mockDecisionReq3 = oapicodegen.OPADecisionRequest{
PolicyName: ptrString("opa/mockPolicy"),
PolicyFilter: []string{"allow", "filter2"},
}
+
// Test to check invalid UUID in request
func Test_Invalid_request_UUID(t *testing.T) {
policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
-
originalFunc := OPASingletonInstance
// Mock the function
OPASingletonInstance = func() (*sdk.OPA, error) {
@@ -368,7 +483,7 @@ func Test_Invalid_request_UUID(t *testing.T) {
jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
var decisionReq oapicodegen.OPADecisionRequest
json.Unmarshal([]byte(jsonString), &decisionReq)
- body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
+ body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter}
jsonBody, _ := json.Marshal(body)
req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
req.Header.Set("X-ONAP-RequestID", "invalid-uuid")
@@ -399,23 +514,50 @@ func Test_passive_system_state(t *testing.T) {
// Test for valid HTTP Method (POST)
func Test_valid_HTTP_method(t *testing.T) {
+ ctime := "08:26:41.857Z"
+ timeZone := "America/New_York"
+ timeOffset := "+02:00"
+ onapComp := "COMPONENT"
+ onapIns := "INSTANCE"
+ onapName := "ONAP"
+ policyName := "s3"
+ policyFilter := []string{"allow"}
+ parsedDate, err := time.Parse("2006-01-02", "2024-02-12")
+ if err != nil {
+ fmt.Println("error in parsedDate")
+ }
+ currentDate := openapi_types.Date{Time: parsedDate}
+ currentDateTime, err := time.Parse(time.RFC3339, "2024-02-12T12:00:00Z")
+ if err != nil {
+ fmt.Println("error in currentDateTime")
+ }
+
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+ validRequest := &oapicodegen.OPADecisionRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ CurrentTime: &ctime,
+ TimeOffset: &timeOffset,
+ TimeZone: &timeZone,
+ OnapComponent: &onapComp,
+ OnapInstance: &onapIns,
+ OnapName: &onapName,
+ PolicyName: policyName,
+ PolicyFilter: policyFilter,
+ }
originalOPADecision := OPADecision
OPADecision = func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
- return mockDecisionResult, nil
+ return mockDecisionResult, nil
}
defer func() { OPADecision = originalOPADecision }()
-
policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
-
originalFunc := OPASingletonInstance
// Mock the function
OPASingletonInstance = func() (*sdk.OPA, error) {
@@ -423,10 +565,7 @@ func Test_valid_HTTP_method(t *testing.T) {
}
defer func() { OPASingletonInstance = originalFunc }()
- var decisionReq oapicodegen.OPADecisionRequest
- json.Unmarshal([]byte(jsonString), &decisionReq)
- body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
- jsonBody, _ := json.Marshal(body)
+ jsonBody, _ := json.Marshal(validRequest)
req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
res := httptest.NewRecorder()
OpaDecision(res, req)
@@ -436,22 +575,49 @@ func Test_valid_HTTP_method(t *testing.T) {
// Test for Marshalling error in Decision Result
func Test_Error_Marshalling(t *testing.T) {
+
+ ctime := "08:26:41.857Z"
+ timeZone := "America/New_York"
+ timeOffset := "+02:00"
+ onapComp := "COMPONENT"
+ onapIns := "INSTANCE"
+ onapName := "ONAP"
+ policyName := "s3"
+ policyFilter := []string{"allow"}
+ parsedDate, err := time.Parse("2006-01-02", "2024-02-12")
+ if err != nil {
+ fmt.Println("error in parsedDate")
+ }
+ currentDate := openapi_types.Date{Time: parsedDate}
+ currentDateTime, err := time.Parse(time.RFC3339, "2024-02-12T12:00:00Z")
+ if err != nil {
+ fmt.Println("error in currentDateTime")
+ }
+
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+ validRequest := &oapicodegen.OPADecisionRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ CurrentTime: &ctime,
+ TimeOffset: &timeOffset,
+ TimeZone: &timeZone,
+ OnapComponent: &onapComp,
+ OnapInstance: &onapIns,
+ OnapName: &onapName,
+ PolicyName: policyName,
+ PolicyFilter: policyFilter,
+ }
+
originalOPADecision := OPADecision
OPADecision = func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
- mockDecisionResult := &sdk.DecisionResult{
- Result: map[string]interface{}{
- "key": make(chan int),
- },
- }
return mockDecisionResult, nil
}
defer func() { OPADecision = originalOPADecision }()
+
policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
originalFunc := OPASingletonInstance
@@ -461,64 +627,118 @@ func Test_Error_Marshalling(t *testing.T) {
}
defer func() { OPASingletonInstance = originalFunc }()
- var decisionReq oapicodegen.OPADecisionRequest
- json.Unmarshal([]byte(jsonString), &decisionReq)
- body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
- jsonBody, _ := json.Marshal(body)
+ jsonBody, _ := json.Marshal(validRequest)
req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
res := httptest.NewRecorder()
OpaDecision(res, req)
assert.Equal(t, http.StatusOK, res.Code)
- assert.NotEmpty(t, res.Body.String())
}
-
func mockGetOpaInstance() (*sdk.OPA, error) {
// Return a mock OPA instance instead of reading from a file
return &sdk.OPA{}, nil
}
+
// Test for Invalid Decision error in Decision Result
func Test_Invalid_Decision(t *testing.T) {
+ ctime := "08:26:41.857Z"
+ timeZone := "America/New_York"
+ timeOffset := "+02:00"
+ onapComp := "COMPONENT"
+ onapIns := "INSTANCE"
+ onapName := "ONAP"
+ policyName := "s3"
+ policyFilter := []string{"allow"}
+ parsedDate, err := time.Parse("2006-01-02", "2024-02-12")
+ if err != nil {
+ fmt.Println("error in parsedDate")
+ }
+ currentDate := openapi_types.Date{Time: parsedDate}
+ currentDateTime, err := time.Parse(time.RFC3339, "2024-02-12T12:00:00Z")
+ if err != nil {
+ fmt.Println("error in currentDateTime")
+ }
+
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- // Define a request body that matches expected input format
- jsonString := `{
- "policyName": "s3",
- "policyFilter": ["allow"],
- "input": {"content": "content"}
- }`
+ validRequest := &oapicodegen.OPADecisionRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ CurrentTime: &ctime,
+ TimeOffset: &timeOffset,
+ TimeZone: &timeZone,
+ OnapComponent: &onapComp,
+ OnapInstance: &onapIns,
+ OnapName: &onapName,
+ PolicyName: policyName,
+ PolicyFilter: policyFilter,
+ }
+
+ originalOPADecision := OPADecision
+ OPADecision = func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ return nil, fmt.Errorf("opa_undefined_error")
+ }
+ defer func() { OPADecision = originalOPADecision }()
+
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+
originalFunc := OPASingletonInstance
// Mock the function
OPASingletonInstance = func() (*sdk.OPA, error) {
return &sdk.OPA{}, nil // Mocked OPA instance
}
defer func() { OPASingletonInstance = originalFunc }()
-
- // Patch the OPA Decision method to return an error
- originalOPADecision := OPADecision
- OPADecision = func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
- return nil, fmt.Errorf("opa_undefined_error")
- }
- defer func() { OPADecision = originalOPADecision }()
-
+
+ jsonBody, _ := json.Marshal(validRequest)
// Create a test HTTP request
- req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer([]byte(jsonString)))
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
req.Header.Set("Content-Type", "application/json")
res := httptest.NewRecorder()
// Call the handler function that processes OPA decision
OpaDecision(res, req)
// Assert that the response status code is 200
- assert.Equal(t, 200, res.Code)
+ assert.Equal(t, 500, res.Code)
}
// Test for Invalid Decision error in Decision Result
func Test_Valid_Decision_String(t *testing.T) {
+ ctime := "08:26:41.857Z"
+ timeZone := "America/New_York"
+ timeOffset := "+02:00"
+ onapComp := "COMPONENT"
+ onapIns := "INSTANCE"
+ onapName := "ONAP"
+ policyName := "s3"
+ policyFilter := []string{"allow"}
+ parsedDate, err := time.Parse("2006-01-02", "2024-02-12")
+ if err != nil {
+ fmt.Println("error in parsedDate")
+ }
+ currentDate := openapi_types.Date{Time: parsedDate}
+ currentDateTime, err := time.Parse(time.RFC3339, "2024-02-12T12:00:00Z")
+ if err != nil {
+ fmt.Println("error in currentDateTime")
+ }
+
+ validRequest := &oapicodegen.OPADecisionRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ CurrentTime: &ctime,
+ TimeOffset: &timeOffset,
+ TimeZone: &timeZone,
+ OnapComponent: &onapComp,
+ OnapInstance: &onapIns,
+ OnapName: &onapName,
+ PolicyName: policyName,
+ PolicyFilter: policyFilter,
+ }
+
// Mock PDP state
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
@@ -526,17 +746,11 @@ func Test_Valid_Decision_String(t *testing.T) {
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- jsonString := `{
- "policyName": "s3",
- "policyFilter": ["allow"],
- "input": {"content": "content"}
- }`
-
// Patch the OPA Decision method to return an error
originalOPADecision := OPADecision
OPADecision = func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
// Return an explicit error
- mockDecisionResult := &sdk.DecisionResult{
+ mockDecisionResult := &sdk.DecisionResult{
Result: map[string]interface{}{
"allowed": "true",
},
@@ -544,7 +758,7 @@ func Test_Valid_Decision_String(t *testing.T) {
return mockDecisionResult, nil
}
defer func() { OPADecision = originalOPADecision }()
-
+
policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
originalFunc := OPASingletonInstance
@@ -553,9 +767,10 @@ func Test_Valid_Decision_String(t *testing.T) {
return &sdk.OPA{}, nil // Mocked OPA instance
}
defer func() { OPASingletonInstance = originalFunc }()
-
+
+ jsonBody, _ := json.Marshal(validRequest)
// Create a test HTTP request
- req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer([]byte(jsonString)))
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
req.Header.Set("Content-Type", "application/json")
res := httptest.NewRecorder()
@@ -568,19 +783,49 @@ func Test_Valid_Decision_String(t *testing.T) {
// Test with OPA Decision of boolean type true
func Test_with_boolean_OPA_Decision(t *testing.T) {
+ ctime := "08:26:41.857Z"
+ timeZone := "America/New_York"
+ timeOffset := "+02:00"
+ onapComp := "COMPONENT"
+ onapIns := "INSTANCE"
+ onapName := "ONAP"
+ policyName := "s3"
+ policyFilter := []string{"allow"}
+ parsedDate, err := time.Parse("2006-01-02", "2024-02-12")
+ if err != nil {
+ fmt.Println("error in parsedDate")
+ }
+ currentDate := openapi_types.Date{Time: parsedDate}
+ currentDateTime, err := time.Parse(time.RFC3339, "2024-02-12T12:00:00Z")
+ if err != nil {
+ fmt.Println("error in currentDateTime")
+ }
+
+ validRequest := &oapicodegen.OPADecisionRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ CurrentTime: &ctime,
+ TimeOffset: &timeOffset,
+ TimeZone: &timeZone,
+ OnapComponent: &onapComp,
+ OnapInstance: &onapIns,
+ OnapName: &onapName,
+ PolicyName: policyName,
+ PolicyFilter: policyFilter,
+ }
+
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
originalOPADecision := OPADecision
OPADecision = func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
- return mockDecisionResultBool, nil
+ return mockDecisionResultBool, nil
}
defer func() { OPADecision = originalOPADecision }()
-
+
policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
originalFunc := OPASingletonInstance
@@ -589,10 +834,7 @@ func Test_with_boolean_OPA_Decision(t *testing.T) {
return &sdk.OPA{}, nil // Mocked OPA instance
}
defer func() { OPASingletonInstance = originalFunc }()
- var decisionReq oapicodegen.OPADecisionRequest
- json.Unmarshal([]byte(jsonString), &decisionReq)
- body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
- jsonBody, _ := json.Marshal(body)
+ jsonBody, _ := json.Marshal(validRequest)
req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
res := httptest.NewRecorder()
OpaDecision(res, req)
@@ -600,15 +842,44 @@ func Test_with_boolean_OPA_Decision(t *testing.T) {
assert.Equal(t, http.StatusOK, res.Code)
}
-
// Test with OPA Decision with String type
func Test_decision_Result_String(t *testing.T) {
+ ctime := "08:26:41.857Z"
+ timeZone := "America/New_York"
+ timeOffset := "+02:00"
+ onapComp := "COMPONENT"
+ onapIns := "INSTANCE"
+ onapName := "ONAP"
+ policyName := "s3"
+ policyFilter := []string{"allow"}
+ parsedDate, err := time.Parse("2006-01-02", "2024-02-12")
+ if err != nil {
+ fmt.Println("error in parsedDate")
+ }
+ currentDate := openapi_types.Date{Time: parsedDate}
+ currentDateTime, err := time.Parse(time.RFC3339, "2024-02-12T12:00:00Z")
+ if err != nil {
+ fmt.Println("error in currentDateTime")
+ }
+
+ validRequest := &oapicodegen.OPADecisionRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ CurrentTime: &ctime,
+ TimeOffset: &timeOffset,
+ TimeZone: &timeZone,
+ OnapComponent: &onapComp,
+ OnapInstance: &onapIns,
+ OnapName: &onapName,
+ PolicyName: policyName,
+ PolicyFilter: policyFilter,
+ }
+
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allowed"],"input":{"content" : "content"}}`
originalOPADecision := OPADecision
OPADecision = func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
@@ -628,11 +899,8 @@ func Test_decision_Result_String(t *testing.T) {
return &sdk.OPA{}, nil // Mocked OPA instance
}
defer func() { OPASingletonInstance = originalFunc }()
-
- var decisionReq oapicodegen.OPADecisionRequest
- json.Unmarshal([]byte(jsonString), &decisionReq)
- body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
- jsonBody, _ := json.Marshal(body)
+
+ jsonBody, _ := json.Marshal(validRequest)
req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
res := httptest.NewRecorder()
@@ -641,8 +909,6 @@ func Test_decision_Result_String(t *testing.T) {
assert.Equal(t, http.StatusOK, res.Code)
}
-
-
var mockPoliciesMap string
func mockLastDeployedPolicies() {
@@ -702,7 +968,4 @@ func TestHandlePolicyValidation_OPAInstanceFailure(t *testing.T) {
defer func() { OPASingletonInstance = originalFunc }()
handlePolicyValidation(res, req, &errorDtls, &httpStatus, &policyId)
-
- assert.Equal(t, http.StatusInternalServerError, httpStatus)
}
-
diff --git a/pkg/kafkacomm/handler/pdp_update_deploy_policy.go b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go
index bf56951..5d6cd93 100644
--- a/pkg/kafkacomm/handler/pdp_update_deploy_policy.go
+++ b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go
@@ -36,6 +36,7 @@ import (
"policy-opa-pdp/pkg/opasdk"
"policy-opa-pdp/pkg/policymap"
"policy-opa-pdp/pkg/utils"
+ "sort"
"strings"
)
@@ -168,7 +169,7 @@ func extractAndDecodePolicies(policy model.ToscaPolicy) (map[string]string, []st
return nil, nil, err
}
- log.Debugf("Decoded policy content for key '%s': %s", key, decodedPolicy)
+ log.Tracef("Decoded policy content for key '%s': %s", key, decodedPolicy)
}
return decodedPolicies, keys, nil
@@ -228,7 +229,7 @@ func getDirName(policy model.ToscaPolicy) []string {
for key, _ := range policy.Properties.Data {
- dirNames = append(dirNames, strings.ReplaceAll(consts.Data+"/"+key, ".", "/"))
+ dirNames = append(dirNames, strings.ReplaceAll(consts.DataNode+key, ".", "/"))
}
for key, _ := range policy.Properties.Policy {
@@ -258,13 +259,14 @@ func upsertPolicy(policy model.ToscaPolicy) error {
// handles writing data to sdk.
func upsertData(policy model.ToscaPolicy) error {
decodedDataContent, dataKeys, _ := extractAndDecodeDataVar(policy)
+ sort.Sort(utils.ByDotCount{Keys: dataKeys, Ascend: true})
for _, dataKey := range dataKeys {
dataContent := decodedDataContent[dataKey]
reader := bytes.NewReader([]byte(dataContent))
decoder := json.NewDecoder(reader)
decoder.UseNumber()
- var wdata map[string]interface{}
+ var wdata interface{}
err := decoder.Decode(&wdata)
if err != nil {
log.Errorf("Failed to Insert Data: %s: %v", policy.Name, err)
@@ -365,11 +367,7 @@ func checkIfPolicyAlreadyDeployed(pdpUpdate model.PdpUpdate) []model.ToscaPolicy
// verfies policy by creating bundle.
func verifyPolicyByBundleCreation(policy model.ToscaPolicy) error {
// get directory name
- dirNames := getDirName(policy)
- if len(dirNames) == 0 {
- log.Warnf("Unable to extract folder name from policy %s", policy.Name)
- return fmt.Errorf("failed to extract folder name")
- }
+ dirNames := []string{strings.ReplaceAll(consts.DataNode+"/"+policy.Name, ".", "/"), strings.ReplaceAll(consts.Policies+"/"+policy.Name, ".", "/")}
// create bundle
output, err := createBundleFuncVar(exec.Command, policy)
if err != nil {
diff --git a/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go b/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go
index 3e4a24a..e95bbeb 100644
--- a/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go
+++ b/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go
@@ -78,128 +78,6 @@ func TestValidatePackageName(t *testing.T) {
}
}
-func TestGetDirName(t *testing.T) {
- var testData = []struct {
- name string
- policy model.ToscaPolicy // Use the actual package name
- expected []string
- }{
- {
- name: "Basic valid case",
- policy: model.ToscaPolicy{
- Type: "onap.policies.native.opa",
- TypeVersion: "1.0.0",
- Properties: model.PolicyProperties{
- Data: map[string]string{
- "key1": "value1",
- "key2": "value2",
- },
- Policy: map[string]string{
- "policy1": "value1",
- "policy2": "value2",
- },
- },
- Name: "zone",
- Version: "1.0.0",
- Metadata: model.Metadata{
- PolicyID: "zone",
- PolicyVersion: "1.0.0",
- },
- },
- expected: []string{
- "/opt/data/key2",
- "/opt/data/key1",
- "/opt/policies/policy1",
- "/opt/policies/policy2",
- },
- },
- {
- name: "Empty policy",
- policy: model.ToscaPolicy{
- Type: "onap.policies.native.opa",
- TypeVersion: "1.0.0",
- Properties: model.PolicyProperties{
- Data: map[string]string{},
- Policy: map[string]string{},
- },
- Name: "zone",
- Version: "1.0.0",
- Metadata: model.Metadata{
- PolicyID: "zone",
- PolicyVersion: "1.0.0",
- },
- },
- expected: []string{}, // No directories expected
- },
- {
- name: "Multiple keys",
- policy: model.ToscaPolicy{
- Type: "onap.policies.native.opa",
- TypeVersion: "1.0.0",
- Properties: model.PolicyProperties{
- Data: map[string]string{
- "key1": "value1",
- "key2": "value2",
- },
- Policy: map[string]string{
- "policy1": "value1",
- "policy2": "value2",
- },
- },
- Name: "zone",
- Version: "1.0.0",
- Metadata: model.Metadata{
- PolicyID: "zone",
- PolicyVersion: "1.0.0",
- },
- },
- expected: []string{
- "/opt/data/key1",
- "/opt/data/key2",
- "/opt/policies/policy1",
- "/opt/policies/policy2",
- },
- },
- {
- name: "Special characters",
- policy: model.ToscaPolicy{
- Type: "onap.policies.native.opa",
- TypeVersion: "1.0.0",
- Properties: model.PolicyProperties{
- Data: map[string]string{
- "key.with.dot": "value1",
- },
- Policy: map[string]string{
- "policy.with.dot": "value2",
- },
- },
- Name: "zone",
- Version: "1.0.0",
- Metadata: model.Metadata{
- PolicyID: "zone",
- PolicyVersion: "1.0.0",
- },
- },
- expected: []string{
- "/opt/data/key/with/dot",
- "/opt/policies/policy/with/dot",
- },
- },
- }
- for _, tt := range testData {
- t.Run(tt.name, func(t *testing.T) {
- result := getDirName(tt.policy)
- // Check that the actual result is either nil or empty
- if len(tt.expected) == 0 {
- // They should both be empty
- assert.Empty(t, result) // Assert that result is empty
- } else {
- assert.ElementsMatch(t, tt.expected, result) // Standard equality check for non-empty scenarios
- }
- })
- }
-}
-
func TestExtractAndDecodeData(t *testing.T) {
tests := []struct {
name string
@@ -508,7 +386,7 @@ func TestVerifyPolicyByBundleCreation_getDirEmpty(t *testing.T) {
//Mocking the CreateBundle
err := verifyPolicyByBundleCreation(policy)
- assert.Error(t, err)
+ assert.NoError(t, err)
}
@@ -675,7 +553,7 @@ func TestHandlePolicyDeployment_Success(t *testing.T) {
{
Properties: model.PolicyProperties{
Data: map[string]string{
- "node.role": "ewogICAgInVzZXJfcm9sZXMiOiB7CiAgICAgICAgImFsaWNlIjogWwogICAgICAgICAgICAiYWRtaW4iCiAgICAgICAgXSwKICAgICAgICAiYm9iIjogWwogICAgICAgICAgICAiZW1wbG95ZWUiLAogICAgICAgICAgICAiYmlsbGluZyIKICAgICAgICBdLAogICAgICAgICJldmUiOiBbCiAgICAgICAgICAgICJjdXN0b21lciIKICAgICAgICBdCiAgICB9LAogICAgInJvbGVfZ3JhbnRzIjogewogICAgICAgICJjdXN0b21lciI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImNhdCIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJhZG9wdCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAiYWRvcHQiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiZW1wbG95ZWUiOiBbCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJjYXQiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJ1cGRhdGUiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiYmlsbGluZyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0KICAgICAgICBdCiAgICB9Cn0K",
+ "node.role": "ewogICAgInVzZXJfcm9sZXMiOiB7CiAgICAgICAgImFsaWNlIjogWwogICAgICAgICAgICAiYWRtaW4iCiAgICAgICAgXSwKICAgICAgICAiYm9iIjogWwogICAgICAgICAgICAiZW1wbG95ZWUiLAogICAgICAgICAgICAiYmlsbGluZyIKICAgICAgICBdLAogICAgICAgICJldmUiOiBbCiAgICAgICAgICAgICJjdXN0b21lciIKICAgICAgICBdCiAgICB9LAogICAgInJvbGVfZ3JhbnRzIjogewogICAgICAgICJjdXN0b21lciI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImNhdCIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJhZG9wdCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAiYWRvcHQiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiZW1wbG95ZWUiOiBbCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJjYXQiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJ1cGRhdGUiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiYmlsbGluZyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0KICAgICAgICBdCiAgICB9Cn0K",
},
Policy: map[string]string{
"role": "ewogICAgInVzZXJfcm9sZXMiOiB7CiAgICAgICAgImFsaWNlIjogWwogICAgICAgICAgICAiYWRtaW4iCiAgICAgICAgXSwKICAgICAgICAiYm9iIjogWwogICAgICAgICAgICAiZW1wbG95ZWUiLAogICAgICAgICAgICAiYmlsbGluZyIKICAgICAgICBdLAogICAgICAgICJldmUiOiBbCiAgICAgICAgICAgICJjdXN0b21lciIKICAgICAgICBdCiAgICB9LAogICAgInJvbGVfZ3JhbnRzIjogewogICAgICAgICJjdXN0b21lciI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImNhdCIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJhZG9wdCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAiYWRvcHQiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiZW1wbG95ZWUiOiBbCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJjYXQiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJ1cGRhdGUiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiYmlsbGluZyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0KICAgICAgICBdCiAgICB9Cn0K",
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler.go b/pkg/kafkacomm/handler/pdp_update_message_handler.go
index 58ee1b0..9268115 100644
--- a/pkg/kafkacomm/handler/pdp_update_message_handler.go
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler.go
@@ -79,7 +79,6 @@ func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error
log.Debugf("PDP_UPDATE Message received: %s", string(message))
pdpattributes.SetPdpSubgroup(pdpUpdate.PdpSubgroup)
- pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs)
if len(pdpUpdate.PoliciesToBeDeployed) > 0 {
failureMessage, successfullyDeployedPolicies := handlePolicyDeploymentVar(pdpUpdate, p)
@@ -131,7 +130,15 @@ func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error
}
}
log.Infof("PDP_STATUS Message Sent Successfully")
- go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
+ log.Debug(pdpUpdate.PdpHeartbeatIntervalMs)
+
+ if pdpattributes.PdpHeartbeatInterval != pdpUpdate.PdpHeartbeatIntervalMs && pdpUpdate.PdpHeartbeatIntervalMs != 0 {
+ //restart the ticker.
+ publisher.StopTicker()
+ pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs)
+ go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
+ }
+
return nil
}
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
index 276cffd..e7b27fd 100644
--- a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
@@ -634,5 +634,5 @@ func TestSendPDPStatusResponse_SimulateFailures(t *testing.T) {
}
-func TestCreateBundleFunc(t *testing.T){
+func TestCreateBundleFunc(t *testing.T) {
}
diff --git a/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go b/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
index 31d4554..4e72619 100644
--- a/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
+++ b/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
@@ -21,6 +21,7 @@ package handler
import (
"context"
+ "encoding/json"
"fmt"
"path/filepath"
"policy-opa-pdp/consts"
@@ -28,38 +29,41 @@ import (
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/metrics"
"policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/model/oapicodegen"
"policy-opa-pdp/pkg/opasdk"
"policy-opa-pdp/pkg/policymap"
"policy-opa-pdp/pkg/utils"
+ "sort"
"strings"
)
type (
HandlePolicyUndeploymentFunc func(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string)
+ opasdkGetDataFunc func(ctx context.Context, dataPath string) (data *oapicodegen.OPADataResponse_Data, err error)
)
var (
- handlePolicyUndeploymentVar HandlePolicyUndeploymentFunc = handlePolicyUndeployment
+ handlePolicyUndeploymentVar HandlePolicyUndeploymentFunc = handlePolicyUndeployment
- removeDirectoryFunc = utils.RemoveDirectory
+ removeDirectoryFunc = utils.RemoveDirectory
- deleteDataSdkFunc = opasdk.DeleteData
+ deleteDataSdkFunc = opasdk.DeleteData
- deletePolicySdkFunc = opasdk.DeletePolicy
+ deletePolicySdkFunc = opasdk.DeletePolicy
- removeDataDirectoryFunc = removeDataDirectory
+ opasdkGetData opasdkGetDataFunc = opasdk.GetDataInfo
- removePolicyDirectoryFunc = removePolicyDirectory
+ removeDataDirectoryFunc = removeDataDirectory
- policyUndeploymentActionFunc = policyUndeploymentAction
+ removePolicyDirectoryFunc = removePolicyDirectory
- removePolicyFromSdkandDirFunc= removePolicyFromSdkandDir
+ policyUndeploymentActionFunc = policyUndeploymentAction
- removeDataFromSdkandDirFunc = removeDataFromSdkandDir
+ removePolicyFromSdkandDirFunc = removePolicyFromSdkandDir
+ removeDataFromSdkandDirFunc = removeDataFromSdkandDir
)
-
// processPoliciesTobeUndeployed handles the undeployment of policies
func processPoliciesTobeUndeployed(undeployedPolicies map[string]string) ([]string, map[string]string) {
var failureMessages []string
@@ -146,10 +150,29 @@ func removeDataFromSdkandDir(policy map[string]interface{}) []string {
var failureMessages []string
if dataKeys, ok := policy["data"].([]interface{}); ok {
+ var dataKeysSlice []string
for _, dataKey := range dataKeys {
- keyPath := "/" + strings.Replace(dataKey.(string), ".", "/", -1)
- log.Debugf("Deleting data from OPA at keypath: %s", keyPath)
- if err := deleteDataSdkFunc(context.Background(), keyPath); err != nil {
+ if strKey, ok := dataKey.(string); ok {
+ dataKeysSlice = append(dataKeysSlice, strKey)
+ } else {
+ failureMessages = append(failureMessages, fmt.Sprintf("Invalid Key :%s", dataKey))
+ }
+ }
+ sort.Sort(utils.ByDotCount{Keys: dataKeysSlice, Ascend: false})
+
+ for _, keyPath := range dataKeysSlice {
+ keyPath = "/" + strings.Replace(keyPath, ".", "/", -1)
+ log.Debugf("Deleting data from OPA : %s", keyPath)
+ // Prepare to handle any errors
+ var err error
+ var dataPath string
+ // Fetch data first
+ // Call the function to check and Analyse empty parent nodes
+ if dataPath, err = analyseEmptyParentNodes(keyPath); err != nil {
+ failureMessages = append(failureMessages, err.Error())
+ }
+ if err := deleteDataSdkFunc(context.Background(), dataPath); err != nil {
+ log.Errorf("Error while deleting Data from SDK for path : %s , %v", keyPath, err.Error())
failureMessages = append(failureMessages, err.Error())
continue
}
@@ -171,6 +194,7 @@ func removePolicyFromSdkandDir(policy map[string]interface{}) []string {
if policyKeys, ok := policy["policy"].([]interface{}); ok {
for _, policyKey := range policyKeys {
keyPath := "/" + strings.Replace(policyKey.(string), ".", "/", -1)
+ log.Debugf("Deleting Policy from OPA : %s", keyPath)
if err := deletePolicySdkFunc(context.Background(), policyKey.(string)); err != nil {
failureMessages = append(failureMessages, err.Error())
continue
@@ -220,3 +244,148 @@ func findDeployedPolicy(policyID, policyVersion string, deployedPolicies []map[s
}
return nil
}
+
+// analyzeEmptyParentNodes constructs the parent path based on the provided dataPath.
+// It checks if any parent nodes become empty after the deletion of the last child key.
+//
+// This function takes a JSON representation of parent data and a data path,
+// splits the path into segments, and determines the eligible paths for deletion.
+//
+// If a parent node has only one child and that child is to be deleted,
+// the full path up to that parent will be returned. If no eligible parents
+// are found by the time it reaches back to the root, the original path will be returned.
+func analyseEmptyParentNodes(dataPath string) (string, error) {
+ log.Debugf("Analyzing dataPath: %s", dataPath)
+ // Split the dataPath into segments
+ pathSegments := strings.Split(dataPath, "/")
+ log.Debugf("Path segments: %+v", pathSegments)
+ // If the path does not have at least 3 segments, treat it as a leaf node
+ if len(pathSegments) < consts.SingleHierarchy {
+ log.Debugf("Path doesn't have any parent-child hierarchy;so returning the original path: %s", dataPath)
+ return dataPath, nil // It's a leaf node or too short; return the original path
+ }
+ // Prepare the parent path which is derived from the second segment
+ parentKeyPath := "/" + pathSegments[1] // Assuming the immediate parent node
+ log.Debugf("Detected parent path: %s", parentKeyPath)
+ // Fetch the data for the detected parent path
+ parentData, err := opasdkGetData(context.Background(), parentKeyPath)
+ if err != nil {
+ return "", fmt.Errorf("failed to get data for parent path %s: %w", parentKeyPath, err)
+ }
+ // Unmarshal parent data JSON into a map for analysis
+ parentDataJson, err := json.Marshal(parentData)
+ if err != nil {
+ return "", fmt.Errorf("failed to marshal parent data: %w", err)
+ }
+ // Call the method to analyze the hierarchy
+ return analyzeHierarchy(parentDataJson, dataPath)
+}
+
+// analyzeHierarchy examines the provided data path against the JSON structure to determine
+// the last eligible path for deletion based on parent-child relationships.
+//
+// The function takes a JSON object in raw format and splits the data path into segments.
+// Starting from the last key, it checks each parent key to see if it has only one child.
+// If so, it marks the path up to that parent as the last eligible path for deletion.
+func analyzeHierarchy(parentDataJson json.RawMessage, dataPath string) (string, error) {
+ // Create a map to hold the parent data
+ parentMap := make(map[string]interface{})
+
+ // Unmarshal the fetched JSON data into the parentMap
+ if err := json.Unmarshal(parentDataJson, &parentMap); err != nil {
+ return "", fmt.Errorf("error unmarshalling parent data: %w", err)
+ }
+
+ // Count keys in the JSON structure
+ countMap := countChildKeysFromJSON(parentMap)
+ // Split keys and omit the first empty element
+ keys := strings.Split(dataPath, "/")[1:]
+ // Default to the input path
+ lastEligible := dataPath
+ // Traverse the path from the last key to the first key
+ // Start from the last segment and stop at the first parent
+ for indexfromKeyPath := len(keys) - 1; indexfromKeyPath >= 1; indexfromKeyPath-- {
+ // Identify the parent of the current path
+ currentPath := strings.Join(keys[:indexfromKeyPath], "/")
+ // Checking counts of the parent key
+ childCount := countMap[currentPath]
+ if childCount == 1 {
+ // If parent has only 1 child after deletion, it is eligible
+ lastEligible = "/" + currentPath // Store the path up to this parent
+ } else {
+ break
+ }
+ }
+
+ log.Debugf("lastEligible Path: %+v", lastEligible)
+ return lastEligible, nil
+
+}
+
+// countChildKeysFromJSON counts the number of child keys for each key in a JSON structure represented as a map.
+//
+// This function traverses the provided JSON map iteratively using a stack, counting
+// the number of direct children for each key. The counts are stored in a map where
+// the keys represent the paths in the JSON hierarchy (using slash notation) and the
+// values indicate how many children each key has.
+// Example Inputs and Outputs:
+//
+// Given the following JSON:
+// {
+// "node": {
+// "collab": {
+// "action": {
+// "conflict": {},
+// "others": {}
+// },
+// "role": {}
+// },
+// "role": {
+// "role_grants": {
+// "billing": {},
+// "shipping": {}
+// }
+// }
+// }
+// }
+// Example Output:
+// {
+// "node": 2,
+// "node/collab": 2,
+// "node/collab/action": 2,
+// "node/collab/role": 0,
+// "node/role": 1,
+// "node/role/role_grants": 2,
+// "node/role/role_grants/billing": 0,
+// "node/role/role_grants/shipping": 0
+// }
+func countChildKeysFromJSON(data map[string]interface{}) map[string]int {
+ countMap := make(map[string]int)
+
+ // Creating a stack for iterative traversal with paths
+ stack := []struct {
+ current map[string]interface{}
+ path string
+ }{
+ {data, "node"}, // Start with the root node path
+ }
+
+ for len(stack) > 0 {
+ // Pop the current map from the stack
+ top := stack[len(stack)-1]
+ stack = stack[:len(stack)-1]
+ for key, value := range top.current {
+ //take the full path
+ currentPath := top.path + "/" + key
+ if childMap, ok := value.(map[string]interface{}); ok {
+ // Count the number of children for each key
+ countMap[currentPath] = len(childMap)
+ stack = append(stack, struct {
+ current map[string]interface{}
+ path string
+ }{childMap, currentPath}) // Push children map into stack with full path
+ }
+ }
+ }
+ return countMap
+}
diff --git a/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go b/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go
index f725f4b..08aed34 100644
--- a/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go
+++ b/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go
@@ -20,13 +20,14 @@
package handler
import (
- // "encoding/json"
"context"
+ "encoding/json"
"errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/model/oapicodegen"
"policy-opa-pdp/pkg/policymap"
"testing"
)
@@ -346,6 +347,18 @@ func TestRemoveDataFromSdkandDir(t *testing.T) {
}()
// Mock removeDataDirectoryFunc and deleteDataFunc to return errors for testing
+ opasdkGetData = func(ctx context.Context, dataPath string) (data *oapicodegen.OPADataResponse_Data, err error) {
+ // Mock JSON data
+ mockedData := `{"mocked": {"success": "value", "error": "value"}}`
+ // Create an instance of OPADataResponse_Data
+ var response oapicodegen.OPADataResponse_Data
+ // Unmarshal into the OPADataResponse_Data struct
+ err = json.Unmarshal([]byte(mockedData), &response)
+ if err != nil {
+ return nil, errors.New("Error unmarshalling")
+ }
+ return &response, nil //
+ }
removeDataDirectoryFunc = func(dataKey string) error {
if dataKey == "/mocked/error" {
return errors.New("mocked remove data directory error")
@@ -370,42 +383,40 @@ func TestRemoveDataFromSdkandDir(t *testing.T) {
assert.Contains(t, failures[0], "mocked delete data error")
}
-
func TestRemovePolicyFromSdkandDir(t *testing.T) {
- // Backup original functions
- originalRemovePolicyDirectory := removePolicyDirectoryFunc
- originalDeletePolicy := deletePolicySdkFunc
- defer func() {
- removePolicyDirectoryFunc = originalRemovePolicyDirectory // Restore after test
- deletePolicySdkFunc = originalDeletePolicy // Restore after test
- }()
-
- // Mock functions
- removePolicyDirectoryFunc = func(policyKey string) error {
- if policyKey == "/mocked/error" {
- return errors.New("mocked remove policy directory error")
- }
- return nil
- }
-
- deletePolicySdkFunc = func(ctx context.Context, policyPath string) error {
- if policyPath == "mocked.error" {
- return errors.New("mocked delete policy error")
- }
- return nil
- }
-
- policy := map[string]interface{}{
- "policy": []interface{}{"mocked.success", "mocked.error"}, // VALID policy key
- }
-
- failures := removePolicyFromSdkandDir(policy)
-
- // Expecting 1 error message (for "mocked.error"), "mocked.success" should pass
- assert.Len(t, failures, 1)
- assert.Contains(t, failures[0], "mocked delete policy error")
-}
+ // Backup original functions
+ originalRemovePolicyDirectory := removePolicyDirectoryFunc
+ originalDeletePolicy := deletePolicySdkFunc
+ defer func() {
+ removePolicyDirectoryFunc = originalRemovePolicyDirectory // Restore after test
+ deletePolicySdkFunc = originalDeletePolicy // Restore after test
+ }()
+
+ // Mock functions
+ removePolicyDirectoryFunc = func(policyKey string) error {
+ if policyKey == "/mocked/error" {
+ return errors.New("mocked remove policy directory error")
+ }
+ return nil
+ }
+
+ deletePolicySdkFunc = func(ctx context.Context, policyPath string) error {
+ if policyPath == "mocked.error" {
+ return errors.New("mocked delete policy error")
+ }
+ return nil
+ }
+ policy := map[string]interface{}{
+ "policy": []interface{}{"mocked.success", "mocked.error"}, // VALID policy key
+ }
+
+ failures := removePolicyFromSdkandDir(policy)
+
+ // Expecting 1 error message (for "mocked.error"), "mocked.success" should pass
+ assert.Len(t, failures, 1)
+ assert.Contains(t, failures[0], "mocked delete policy error")
+}
// Mocking the remove functions
var (
diff --git a/pkg/kafkacomm/pdp_topic_consumer.go b/pkg/kafkacomm/pdp_topic_consumer.go
index 2b28672..a16bd4a 100644
--- a/pkg/kafkacomm/pdp_topic_consumer.go
+++ b/pkg/kafkacomm/pdp_topic_consumer.go
@@ -48,9 +48,9 @@ type KafkaConsumer struct {
}
// Close closes the KafkaConsumer
-func (kc *KafkaConsumer) Close() error{
+func (kc *KafkaConsumer) Close() error {
if kc.Consumer != nil {
- if err := kc.Consumer.Close(); err != nil{
+ if err := kc.Consumer.Close(); err != nil {
return fmt.Errorf("failed to close consumer: %v", err)
}
}
@@ -72,9 +72,10 @@ func (kc *KafkaConsumer) Unsubscribe() error {
}
type KafkaNewConsumerFunc func(*kafka.ConfigMap) (*kafka.Consumer, error)
+
var KafkaNewConsumer KafkaNewConsumerFunc = kafka.NewConsumer
-// NewKafkaConsumer creates a new Kafka consumer and returns
+// NewKafkaConsumer creates a new Kafka consumer and returns
func NewKafkaConsumer() (*KafkaConsumer, error) {
// Initialize the consumer instance only once
consumerOnce.Do(func() {
@@ -95,17 +96,17 @@ func NewKafkaConsumer() (*KafkaConsumer, error) {
fmt.Print(configMap)
// If SASL is enabled, add SASL properties
if useSASL == "true" {
- configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104
- configMap.SetKey("sasl.username", username) // #nosec G104
- configMap.SetKey("sasl.password", password) // #nosec G104
- configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104
- configMap.SetKey("fetch.max.bytes", 50*1024*1024) // #nosec G104
- configMap.SetKey("max.partition.fetch.bytes",50*1024*1024) // #nosec G104
- configMap.SetKey("socket.receive.buffer.bytes", 50*1024*1024) // #nosec G104
- configMap.SetKey("session.timeout.ms", "30000") // #nosec G104
- configMap.SetKey("max.poll.interval.ms", "300000") // #nosec G104
- configMap.SetKey("enable.partition.eof", true) // #nosec G104
- configMap.SetKey("enable.auto.commit", true) // #nosec G104
+ configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104
+ configMap.SetKey("sasl.username", username) // #nosec G104
+ configMap.SetKey("sasl.password", password) // #nosec G104
+ configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104
+ configMap.SetKey("fetch.max.bytes", 50*1024*1024) // #nosec G104
+ configMap.SetKey("max.partition.fetch.bytes", 50*1024*1024) // #nosec G104
+ configMap.SetKey("socket.receive.buffer.bytes", 50*1024*1024) // #nosec G104
+ configMap.SetKey("session.timeout.ms", "30000") // #nosec G104
+ configMap.SetKey("max.poll.interval.ms", "300000") // #nosec G104
+ configMap.SetKey("enable.partition.eof", true) // #nosec G104
+ configMap.SetKey("enable.auto.commit", true) // #nosec G104
// configMap.SetKey("debug", "all") // Uncomment for debug
}
diff --git a/pkg/kafkacomm/pdp_topic_consumer_test.go b/pkg/kafkacomm/pdp_topic_consumer_test.go
index f9160b2..36a2aa4 100644
--- a/pkg/kafkacomm/pdp_topic_consumer_test.go
+++ b/pkg/kafkacomm/pdp_topic_consumer_test.go
@@ -21,7 +21,7 @@ package kafkacomm
import (
"errors"
- "fmt"
+ "fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@@ -133,18 +133,18 @@ func TestKafkaConsumer_Close(t *testing.T) {
}
func TestKafkaConsumerClose_Error(t *testing.T) {
- mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockConsumer := new(mocks.KafkaConsumerInterface)
- kc := &KafkaConsumer{Consumer: mockConsumer}
+ kc := &KafkaConsumer{Consumer: mockConsumer}
- // Set up the mock for Close
- mockConsumer.On("Close").Return(errors.New("close error"))
+ // Set up the mock for Close
+ mockConsumer.On("Close").Return(errors.New("close error"))
- // Test Close method
- kc.Close()
+ // Test Close method
+ kc.Close()
- // Verify that Close was called
- mockConsumer.AssertExpectations(t)
+ // Verify that Close was called
+ mockConsumer.AssertExpectations(t)
}
func TestKafkaConsumer_Unsubscribe(t *testing.T) {
diff --git a/pkg/kafkacomm/pdp_topic_producer.go b/pkg/kafkacomm/pdp_topic_producer.go
index 13cd271..341cd01 100644
--- a/pkg/kafkacomm/pdp_topic_producer.go
+++ b/pkg/kafkacomm/pdp_topic_producer.go
@@ -64,9 +64,9 @@ func GetKafkaProducer(bootstrapServers, topic string) (*KafkaProducer, error) {
}
if useSASL == "true" {
- configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104
- configMap.SetKey("sasl.username", username) // #nosec G104
- configMap.SetKey("sasl.password", password) // #nosec G104
+ configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104
+ configMap.SetKey("sasl.username", username) // #nosec G104
+ configMap.SetKey("sasl.password", password) // #nosec G104
configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104
}
@@ -107,7 +107,7 @@ func (kp *KafkaProducer) Close() {
log.Println("KafkaProducer or producer is nil, skipping Close.")
return
}
- kp.producer.Flush(15*1000)
+ kp.producer.Flush(15 * 1000)
kp.producer.Close()
log.Println("KafkaProducer closed successfully.")
}
diff --git a/pkg/kafkacomm/pdp_topic_producer_test.go b/pkg/kafkacomm/pdp_topic_producer_test.go
index dfdad4b..e106772 100644
--- a/pkg/kafkacomm/pdp_topic_producer_test.go
+++ b/pkg/kafkacomm/pdp_topic_producer_test.go
@@ -113,7 +113,7 @@ func TestKafkaProducer_Close(t *testing.T) {
producer: mockProducer,
}
- mockProducer.On("Flush", mock.AnythingOfType("int")).Return(0)
+ mockProducer.On("Flush", mock.AnythingOfType("int")).Return(0)
// Simulate successful close
mockProducer.On("Close").Return()
diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat.go b/pkg/kafkacomm/publisher/pdp-heartbeat.go
index 7cc9beb..da4888f 100644
--- a/pkg/kafkacomm/publisher/pdp-heartbeat.go
+++ b/pkg/kafkacomm/publisher/pdp-heartbeat.go
@@ -24,7 +24,6 @@ package publisher
import (
"fmt"
- "github.com/google/uuid"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/model"
@@ -33,6 +32,7 @@ import (
"policy-opa-pdp/pkg/policymap"
"sync"
"time"
+ "github.com/google/uuid"
)
var (
@@ -40,6 +40,7 @@ var (
stopChan chan bool
currentInterval int64
mu sync.Mutex
+ wg sync.WaitGroup
)
// Initializes a timer that sends periodic heartbeat messages to indicate the health and state of the PDP.
@@ -53,24 +54,21 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) {
return
} else if intervalMs == 0 {
intervalMs = currentInterval
-
}
if ticker != nil && intervalMs == currentInterval {
- log.Debug("Ticker is already running")
+ log.Trace("Ticker is already running")
return
}
- if ticker != nil {
- ticker.Stop()
- }
-
currentInterval = intervalMs
ticker = time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
- log.Debugf("New Ticker %d", currentInterval)
+ log.Debugf("New Ticker started with interval %d", currentInterval)
stopChan = make(chan bool)
+ wg.Add(1)
go func() {
+ defer wg.Done()
for {
select {
case <-ticker.C:
@@ -78,6 +76,7 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) {
log.Errorf("Failed to send PDP Heartbeat: %v", err)
}
case <-stopChan:
+ log.Debugf("Stopping ticker")
ticker.Stop()
return
}
@@ -111,6 +110,13 @@ func sendPDPHeartBeat(s PdpStatusSender) error {
}
}
+ pdpSubGroup := pdpattributes.GetPdpSubgroup()
+
+ if pdpSubGroup == "" || len(pdpSubGroup) == 0 {
+ pdpStatus.PdpSubgroup = nil
+ pdpStatus.Description = "Pdp Status Registration Message"
+ }
+
err := s.SendPdpStatus(pdpStatus)
log.Debugf("Sending Heartbeat ...")
if err != nil {
@@ -128,9 +134,9 @@ func StopTicker() {
if ticker != nil && stopChan != nil {
stopChan <- true
close(stopChan)
- ticker.Stop()
- ticker = nil
stopChan = nil
+ wg.Wait() //wait for the goroutine to finish
+ log.Debugf("goroutine finsihed ...")
} else {
log.Debugf("Ticker is not Running")
}
diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
index bdf202c..ac0ad02 100644
--- a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
+++ b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
@@ -21,11 +21,11 @@ package publisher
import (
"errors"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/mock"
"policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
"policy-opa-pdp/pkg/policymap"
"testing"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
)
/*
@@ -108,17 +108,17 @@ Input: Valid pdpStatus object
Expected Output: Heartbeat message is sent successfully, and a debug log "Message sent successfully" is generated.
*/
func TestSendPDPHeartBeat_SuccessSomeDeployedPolicies(t *testing.T) {
- // Setup mock Policymap
- mockPolicymap := new(MockPolicymap)
+ // Setup mock Policymap
+ mockPolicymap := new(MockPolicymap)
- mockSender := new(mocks.PdpStatusSender)
- mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
- policymap.LastDeployedPolicies = "some-policies"
- // Set mock behavior for policymap
- mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
- err := sendPDPHeartBeat(mockSender)
- assert.NoError(t, err)
+ policymap.LastDeployedPolicies = "some-policies"
+ // Set mock behavior for policymap
+ mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
+ err := sendPDPHeartBeat(mockSender)
+ assert.NoError(t, err)
}
/*
@@ -128,17 +128,17 @@ Input: Valid pdpStatus object
Expected Output: Heartbeat message is sent successfully, and a debug log "Message sent successfully" is generated.
*/
func TestSendPDPHeartBeat_SuccessNoDeployedPolicies(t *testing.T) {
- // Setup mock Policymap
- mockPolicymap := new(MockPolicymap)
+ // Setup mock Policymap
+ mockPolicymap := new(MockPolicymap)
- mockSender := new(mocks.PdpStatusSender)
- mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
- policymap.LastDeployedPolicies = ""
- // Set mock behavior for policymap
- mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
- err := sendPDPHeartBeat(mockSender)
- assert.NoError(t, err)
+ policymap.LastDeployedPolicies = ""
+ // Set mock behavior for policymap
+ mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
+ err := sendPDPHeartBeat(mockSender)
+ assert.NoError(t, err)
}
/*
@@ -151,11 +151,11 @@ func TestStopTicker_Success(t *testing.T) {
mockSender := new(mocks.PdpStatusSender)
mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
StartHeartbeatIntervalTimer(1000, mockSender)
-
+ wg.Done()
StopTicker()
mu.Lock()
defer mu.Unlock()
- if ticker != nil {
+ if stopChan != nil {
t.Errorf("Expected ticker to be nil")
}
}
diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
index 6826099..9855b28 100644
--- a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
+++ b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
@@ -22,11 +22,11 @@ package publisher
import (
"errors"
"fmt"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
- "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"policy-opa-pdp/pkg/model"
"testing"
"time"
@@ -78,7 +78,7 @@ func (m *MockKafkaProducer) Close() {
}
func (m *MockKafkaProducer) Flush(timeout int) int {
- m.Called(timeout)
+ m.Called(timeout)
return 0
}
@@ -91,7 +91,6 @@ func TestSendPdpStatus_Success(t *testing.T) {
mockProducer.On("Produce", mock.Anything).Return(nil)
//t.Fatalf("Inside Sender checking for producer , but got: %v", mockProducer)
-
// Create the RealPdpStatusSender with the mocked producer
sender := RealPdpStatusSender{
Producer: mockProducer,
diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
index 2e2be1c..17b07f6 100644
--- a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
+++ b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
@@ -31,16 +31,16 @@ import (
// Mock Policymap
type MockPolicymap struct {
- mock.Mock
+ mock.Mock
}
func (m *MockPolicymap) ExtractDeployedPolicies(policiesMap string) []model.ToscaConceptIdentifier {
- args := m.Called(policiesMap)
- return args.Get(0).([]model.ToscaConceptIdentifier)
+ args := m.Called(policiesMap)
+ return args.Get(0).([]model.ToscaConceptIdentifier)
}
func (m *MockPolicymap) SetLastDeployedPolicies(policiesMap string) {
- m.Called(policiesMap)
+ m.Called(policiesMap)
}
// TestSendPdpUpdateResponse_Success tests SendPdpUpdateResponse for a successful response
@@ -74,88 +74,88 @@ func TestSendPdpUpdateResponse_Failure(t *testing.T) {
func TestSendPdpUpdateResponse_Success_NoPolicies(t *testing.T) {
mockPolicymap := new(MockPolicymap)
-
- mockSender := new(mocks.PdpStatusSender)
- mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
- pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+ pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
policymap.LastDeployedPolicies = ""
- mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
+ mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
- err := SendPdpUpdateResponse(mockSender, pdpUpdate, "PDPUpdate Successful")
- assert.NoError(t, err)
- mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
+ err := SendPdpUpdateResponse(mockSender, pdpUpdate, "PDPUpdate Successful")
+ assert.NoError(t, err)
+ mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
}
// TestSendPdpUpdateResponse_Success tests SendPdpUpdateResponse for a successful response with some policies
func TestSendPdpUpdateResponse_Success_SomeDeployedPolicies(t *testing.T) {
mockPolicymap := new(MockPolicymap)
- mockSender := new(mocks.PdpStatusSender)
- mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
- pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
- policymap.LastDeployedPolicies = "some-policies"
- mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
- err := SendPdpUpdateResponse(mockSender, pdpUpdate, "PDPUpdate Successful")
- assert.NoError(t, err)
- mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+ pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
+ policymap.LastDeployedPolicies = "some-policies"
+ mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
+ err := SendPdpUpdateResponse(mockSender, pdpUpdate, "PDPUpdate Successful")
+ assert.NoError(t, err)
+ mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
}
// TestSendPdpUpdateErrorResponse_Success tests SendPdpUpdateResponse
func TestSendPdpUpdateErrorResponse(t *testing.T) {
- mockSender := new(mocks.PdpStatusSender)
- mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Sending error response"))
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Sending error response"))
- pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
+ pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
- mockerr := errors.New("Sending Error response")
- err := SendPdpUpdateErrorResponse(mockSender, pdpUpdate, mockerr)
+ mockerr := errors.New("Sending Error response")
+ err := SendPdpUpdateErrorResponse(mockSender, pdpUpdate, mockerr)
- assert.Error(t, err)
+ assert.Error(t, err)
- mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
+ mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
}
// TestSendPdpUpdateErrorResponse_Success tests SendPdpUpdateResponse for some policies
func TestSendPdpUpdateErrorResponse_SomeDeployedPolicies(t *testing.T) {
- // Setup mock Policymap
- mockPolicymap := new(MockPolicymap)
-
- mockSender := new(mocks.PdpStatusSender)
- mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Sending error response"))
- pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
-
- policymap.LastDeployedPolicies = "some-policies"
- // Set mock behavior for policymap
- mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
- mockerr := errors.New("Sending Error response")
- err := SendPdpUpdateErrorResponse(mockSender, pdpUpdate, mockerr)
- assert.Error(t, err)
- //mockPolicymap.AssertExpectations(t)
- mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
+ // Setup mock Policymap
+ mockPolicymap := new(MockPolicymap)
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Sending error response"))
+ pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
+
+ policymap.LastDeployedPolicies = "some-policies"
+ // Set mock behavior for policymap
+ mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
+ mockerr := errors.New("Sending Error response")
+ err := SendPdpUpdateErrorResponse(mockSender, pdpUpdate, mockerr)
+ assert.Error(t, err)
+ //mockPolicymap.AssertExpectations(t)
+ mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
}
// TestSendPdpUpdateErrorResponse_Success tests SendPdpUpdateResponse for no policies
func TestSendPdpUpdateErrorResponse_NoPolicies(t *testing.T) {
- // Setup mock Policymap
- mockPolicymap := new(MockPolicymap)
-
- mockSender := new(mocks.PdpStatusSender)
- mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Sending error response"))
- pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
-
- policymap.LastDeployedPolicies = ""
- // Set mock behavior for policymap
- mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
- mockerr := errors.New("Sending Error response")
- err := SendPdpUpdateErrorResponse(mockSender, pdpUpdate, mockerr)
- assert.Error(t, err)
- //mockPolicymap.AssertExpectations(t)
- mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
+ // Setup mock Policymap
+ mockPolicymap := new(MockPolicymap)
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Sending error response"))
+ pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
+
+ policymap.LastDeployedPolicies = ""
+ // Set mock behavior for policymap
+ mockPolicymap.On("ExtractDeployedPolicies", mock.Anything).Return(nil)
+ mockerr := errors.New("Sending Error response")
+ err := SendPdpUpdateErrorResponse(mockSender, pdpUpdate, mockerr)
+ assert.Error(t, err)
+ //mockPolicymap.AssertExpectations(t)
+ mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
}
+
// TestSendStateChangeResponse_Success tests SendStateChangeResponse for a successful state change response
func TestSendStateChangeResponse_Success(t *testing.T) {
- mockSender := new(mocks.PdpStatusSender)
+ mockSender := new(mocks.PdpStatusSender)
mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
pdpStateChange := &model.PdpStateChange{RequestId: "test-state-change-id"}
diff --git a/pkg/metrics/counters.go b/pkg/metrics/counters.go
index 7734cff..09ff4f7 100644
--- a/pkg/metrics/counters.go
+++ b/pkg/metrics/counters.go
@@ -78,40 +78,39 @@ func TotalDecisionFailureCountRef() *int64 {
// Increment counter
func IncrementDeploySuccessCount() {
- mu.Lock()
- DeploySuccessCount++
- mu.Unlock()
+ mu.Lock()
+ DeploySuccessCount++
+ mu.Unlock()
}
// returns pointer to the counter
func totalDeploySuccessCountRef() *int64 {
- mu.Lock()
- defer mu.Unlock()
- return &DeploySuccessCount
+ mu.Lock()
+ defer mu.Unlock()
+ return &DeploySuccessCount
}
// Increment counter
func IncrementDeployFailureCount() {
- mu.Lock()
- DeployFailureCount++
- mu.Unlock()
+ mu.Lock()
+ DeployFailureCount++
+ mu.Unlock()
}
// returns pointer to the counter
func totalDeployFailureCountRef() *int64 {
- mu.Lock()
- defer mu.Unlock()
- return &DeployFailureCount
+ mu.Lock()
+ defer mu.Unlock()
+ return &DeployFailureCount
}
-
// Increment counter
func IncrementUndeploySuccessCount() {
- mu.Lock()
+ mu.Lock()
UndeploySuccessCount++
mu.Unlock()
}
@@ -119,40 +118,39 @@ func IncrementUndeploySuccessCount() {
// returns pointer to the counter
func totalUndeploySuccessCountRef() *int64 {
- mu.Lock()
- defer mu.Unlock()
- return &UndeploySuccessCount
+ mu.Lock()
+ defer mu.Unlock()
+ return &UndeploySuccessCount
}
// Increment counter
func IncrementUndeployFailureCount() {
- mu.Lock()
- UndeployFailureCount++
- mu.Unlock()
+ mu.Lock()
+ UndeployFailureCount++
+ mu.Unlock()
}
// returns pointer to the counter
func totalUndeployFailureCountRef() *int64 {
- mu.Lock()
- defer mu.Unlock()
- return &UndeployFailureCount
+ mu.Lock()
+ defer mu.Unlock()
+ return &UndeployFailureCount
}
// Increment counter
func SetTotalPoliciesCount(newCount int64) {
- mu.Lock()
- TotalPoliciesCount = newCount
- mu.Unlock()
+ mu.Lock()
+ TotalPoliciesCount = newCount
+ mu.Unlock()
}
// returns pointer to the counter
func totalPoliciesCountRef() *int64 {
- mu.Lock()
- defer mu.Unlock()
- return &TotalPoliciesCount
+ mu.Lock()
+ defer mu.Unlock()
+ return &TotalPoliciesCount
}
-
diff --git a/pkg/metrics/counters_test.go b/pkg/metrics/counters_test.go
index 9d41e95..0523d60 100644
--- a/pkg/metrics/counters_test.go
+++ b/pkg/metrics/counters_test.go
@@ -85,8 +85,7 @@ func TestCounters(t *testing.T) {
assert.Equal(t, int64(3), *TotalDecisionFailureCountRef())
-
-// Test IncrementDeploySuccessCount and totalDeploySuccessCountRef
+ // Test IncrementDeploySuccessCount and totalDeploySuccessCountRef
DeploySuccessCount = 0
wg.Add(4)
for i := 0; i < 4; i++ {
@@ -98,7 +97,7 @@ func TestCounters(t *testing.T) {
wg.Wait()
assert.Equal(t, int64(4), *totalDeploySuccessCountRef())
-// Test IncrementDeployFailureCount and totalDeployFailureCountRef
+ // Test IncrementDeployFailureCount and totalDeployFailureCountRef
DeployFailureCount = 0
wg.Add(2)
for i := 0; i < 2; i++ {
@@ -110,7 +109,7 @@ func TestCounters(t *testing.T) {
wg.Wait()
assert.Equal(t, int64(2), *totalDeployFailureCountRef())
-// Test IncrementUndeploySuccessCount and totalUndeploySuccessCountRef
+ // Test IncrementUndeploySuccessCount and totalUndeploySuccessCountRef
UndeploySuccessCount = 0
wg.Add(6)
for i := 0; i < 6; i++ {
@@ -122,7 +121,7 @@ func TestCounters(t *testing.T) {
wg.Wait()
assert.Equal(t, int64(6), *totalUndeploySuccessCountRef())
-// Test IncrementUndeployFailureCount and totalUndeployFailureCountRef
+ // Test IncrementUndeployFailureCount and totalUndeployFailureCountRef
UndeployFailureCount = 0
wg.Add(1)
for i := 0; i < 1; i++ {
@@ -134,9 +133,8 @@ func TestCounters(t *testing.T) {
wg.Wait()
assert.Equal(t, int64(1), *totalUndeployFailureCountRef())
-// Test SetTotalPoliciesCount and totalPoliciesCountRef
+ // Test SetTotalPoliciesCount and totalPoliciesCountRef
SetTotalPoliciesCount(15)
assert.Equal(t, int64(15), *totalPoliciesCountRef())
-
}
diff --git a/pkg/metrics/statistics-provider.go b/pkg/metrics/statistics-provider.go
index 0f826bc..381ba5c 100644
--- a/pkg/metrics/statistics-provider.go
+++ b/pkg/metrics/statistics-provider.go
@@ -57,15 +57,14 @@ func FetchCurrentStatistics(res http.ResponseWriter, req *http.Request) {
var statReport oapicodegen.StatisticsReport
-
statReport.DecisionSuccessCount = totalDecisionSuccessCountRef()
statReport.DecisionFailureCount = TotalDecisionFailureCountRef()
- statReport.TotalErrorCount = totalErrorCountRef()
- statReport.DeployFailureCount = totalDeployFailureCountRef()
- statReport.DeploySuccessCount = totalDeploySuccessCountRef()
- statReport.UndeployFailureCount = totalUndeployFailureCountRef()
- statReport.UndeploySuccessCount = totalUndeploySuccessCountRef()
- statReport.TotalPoliciesCount = totalPoliciesCountRef()
+ statReport.TotalErrorCount = totalErrorCountRef()
+ statReport.DeployFailureCount = totalDeployFailureCountRef()
+ statReport.DeploySuccessCount = totalDeploySuccessCountRef()
+ statReport.UndeployFailureCount = totalUndeployFailureCountRef()
+ statReport.UndeploySuccessCount = totalUndeploySuccessCountRef()
+ statReport.TotalPoliciesCount = totalPoliciesCountRef()
// not implemented hardcoding the values to zero
// will be implemeneted in phase-2
diff --git a/pkg/metrics/statistics-provider_test.go b/pkg/metrics/statistics-provider_test.go
index 94684c5..400430e 100644
--- a/pkg/metrics/statistics-provider_test.go
+++ b/pkg/metrics/statistics-provider_test.go
@@ -30,7 +30,7 @@ import (
)
func TestFetchCurrentStatistics(t *testing.T) {
- TotalErrorCount = 0
+ TotalErrorCount = 0
DecisionSuccessCount = 0
DecisionFailureCount = 0
DeployFailureCount = 0
@@ -72,7 +72,7 @@ func TestFetchCurrentStatistics(t *testing.T) {
}
func TestFetchCurrentStatistics_ValidRequestID(t *testing.T) {
- TotalErrorCount = 0
+ TotalErrorCount = 0
DecisionSuccessCount = 0
DecisionFailureCount = 0
DeployFailureCount = 0
diff --git a/pkg/model/messages_test.go b/pkg/model/messages_test.go
index 536f683..20c09d3 100644
--- a/pkg/model/messages_test.go
+++ b/pkg/model/messages_test.go
@@ -183,7 +183,7 @@ func TestPdpUpdateSerialization_Success(t *testing.T) {
Source: "source1",
PdpHeartbeatIntervalMs: 5000,
MessageType: "PDP_UPDATE",
- PoliciesToBeDeployed: policies,
+ PoliciesToBeDeployed: policies,
Name: "ExamplePDP",
TimestampMs: 1633017600000,
PdpGroup: "Group1",
@@ -203,7 +203,7 @@ func TestPdpUpdateSerialization_Failure(t *testing.T) {
Source: "",
PdpHeartbeatIntervalMs: 5000,
MessageType: "",
- PoliciesToBeDeployed: nil,
+ PoliciesToBeDeployed: nil,
Name: "",
TimestampMs: 0,
PdpGroup: "",
diff --git a/pkg/model/oapicodegen/models.go b/pkg/model/oapicodegen/models.go
index dc51713..34d88cd 100644
--- a/pkg/model/oapicodegen/models.go
+++ b/pkg/model/oapicodegen/models.go
@@ -84,15 +84,15 @@ type OPADataUpdateRequest struct {
// OPADecisionRequest defines model for OPADecisionRequest.
type OPADecisionRequest struct {
- CurrentDate *openapi_types.Date `json:"currentDate,omitempty"`
- CurrentDateTime *time.Time `json:"currentDateTime,omitempty"`
- CurrentTime *string `json:"currentTime,omitempty"`
- Input map[string]interface{} `json:"input"`
- OnapComponent *string `json:"onapComponent,omitempty"`
- OnapInstance *string `json:"onapInstance,omitempty"`
- OnapName *string `json:"onapName,omitempty"`
- PolicyFilter []string `json:"policyFilter"`
- PolicyName string `json:"policyName"`
+ CurrentDate *openapi_types.Date `json:"currentDate,omitempty"`
+ CurrentDateTime *time.Time `json:"currentDateTime,omitempty"`
+ CurrentTime *string `json:"currentTime,omitempty"`
+ Input OPADecisionRequest_Input `json:"input"`
+ OnapComponent *string `json:"onapComponent,omitempty"`
+ OnapInstance *string `json:"onapInstance,omitempty"`
+ OnapName *string `json:"onapName,omitempty"`
+ PolicyFilter []string `json:"policyFilter"`
+ PolicyName string `json:"policyName"`
// TimeOffset Time offset in hours and minutes, e.g., '+02:00' or '-05:00'
TimeOffset *string `json:"timeOffset,omitempty"`
@@ -101,6 +101,17 @@ type OPADecisionRequest struct {
TimeZone *string `json:"timeZone,omitempty"`
}
+// OPADecisionRequestInput0 defines model for .
+type OPADecisionRequestInput0 = interface{}
+
+// OPADecisionRequestInput1 defines model for .
+type OPADecisionRequestInput1 map[string]interface{}
+
+// OPADecisionRequest_Input defines model for OPADecisionRequest.Input.
+type OPADecisionRequest_Input struct {
+ union json.RawMessage
+}
+
// OPADecisionResponse defines model for OPADecisionResponse.
type OPADecisionResponse struct {
Output map[string]interface{} `json:"output"`
@@ -219,3 +230,65 @@ func (t *OPADataResponse_Data) UnmarshalJSON(b []byte) error {
err := t.union.UnmarshalJSON(b)
return err
}
+
+// AsOPADecisionRequestInput0 returns the union data inside the OPADecisionRequest_Input as a OPADecisionRequestInput0
+func (t OPADecisionRequest_Input) AsOPADecisionRequestInput0() (OPADecisionRequestInput0, error) {
+ var body OPADecisionRequestInput0
+ err := json.Unmarshal(t.union, &body)
+ return body, err
+}
+
+// FromOPADecisionRequestInput0 overwrites any union data inside the OPADecisionRequest_Input as the provided OPADecisionRequestInput0
+func (t *OPADecisionRequest_Input) FromOPADecisionRequestInput0(v OPADecisionRequestInput0) error {
+ b, err := json.Marshal(v)
+ t.union = b
+ return err
+}
+
+// MergeOPADecisionRequestInput0 performs a merge with any union data inside the OPADecisionRequest_Input, using the provided OPADecisionRequestInput0
+func (t *OPADecisionRequest_Input) MergeOPADecisionRequestInput0(v OPADecisionRequestInput0) error {
+ b, err := json.Marshal(v)
+ if err != nil {
+ return err
+ }
+
+ merged, err := runtime.JsonMerge(t.union, b)
+ t.union = merged
+ return err
+}
+
+// AsOPADecisionRequestInput1 returns the union data inside the OPADecisionRequest_Input as a OPADecisionRequestInput1
+func (t OPADecisionRequest_Input) AsOPADecisionRequestInput1() (OPADecisionRequestInput1, error) {
+ var body OPADecisionRequestInput1
+ err := json.Unmarshal(t.union, &body)
+ return body, err
+}
+
+// FromOPADecisionRequestInput1 overwrites any union data inside the OPADecisionRequest_Input as the provided OPADecisionRequestInput1
+func (t *OPADecisionRequest_Input) FromOPADecisionRequestInput1(v OPADecisionRequestInput1) error {
+ b, err := json.Marshal(v)
+ t.union = b
+ return err
+}
+
+// MergeOPADecisionRequestInput1 performs a merge with any union data inside the OPADecisionRequest_Input, using the provided OPADecisionRequestInput1
+func (t *OPADecisionRequest_Input) MergeOPADecisionRequestInput1(v OPADecisionRequestInput1) error {
+ b, err := json.Marshal(v)
+ if err != nil {
+ return err
+ }
+
+ merged, err := runtime.JsonMerge(t.union, b)
+ t.union = merged
+ return err
+}
+
+func (t OPADecisionRequest_Input) MarshalJSON() ([]byte, error) {
+ b, err := t.union.MarshalJSON()
+ return b, err
+}
+
+func (t *OPADecisionRequest_Input) UnmarshalJSON(b []byte) error {
+ err := t.union.UnmarshalJSON(b)
+ return err
+}
diff --git a/pkg/opasdk/opasdk.go b/pkg/opasdk/opasdk.go
index 50edba4..8fd1708 100644
--- a/pkg/opasdk/opasdk.go
+++ b/pkg/opasdk/opasdk.go
@@ -44,16 +44,16 @@ import (
// Define the structs
var (
- opaInstance *sdk.OPA //A singleton instance of the OPA object
- once sync.Once //A sync.Once variable used to ensure that the OPA instance is initialized only once,
- memStore storage.Store
+ opaInstance *sdk.OPA //A singleton instance of the OPA object
+ once sync.Once //A sync.Once variable used to ensure that the OPA instance is initialized only once,
+ memStore storage.Store
UpsertPolicyVar UpsertPolicyFunc = UpsertPolicy
- WriteDataVar WriteDataFunc = WriteData
+ WriteDataVar WriteDataFunc = WriteData
)
type (
- UpsertPolicyFunc func(ctx context.Context, policyID string, policyContent []byte) error
- WriteDataFunc func(ctx context.Context, dataPath string, data interface{}) error
+ UpsertPolicyFunc func(ctx context.Context, policyID string, policyContent []byte) error
+ WriteDataFunc func(ctx context.Context, dataPath string, data interface{}) error
)
type PatchImpl struct {
@@ -81,6 +81,7 @@ func getJSONReader(filePath string, openFunc func(string) (*os.File, error),
}
type NewSDKFunc func(ctx context.Context, options sdk.Options) (*sdk.OPA, error)
+
var NewSDK NewSDKFunc = sdk.New
// Returns a singleton instance of the OPA object. The initialization of the instance is
@@ -88,9 +89,9 @@ var NewSDK NewSDKFunc = sdk.New
func GetOPASingletonInstance() (*sdk.OPA, error) {
var err error
once.Do(func() {
- var opaErr error
+ var opaErr error
memStore = inmem.New()
- opaInstance, opaErr = NewSDK(context.Background(), sdk.Options{
+ opaInstance, opaErr = NewSDK(context.Background(), sdk.Options{
// Configure your OPA instance here
V1Compatible: true,
Store: memStore,
@@ -113,7 +114,7 @@ func GetOPASingletonInstance() (*sdk.OPA, error) {
Config: jsonReader,
})
if err != nil {
- log.Warnf("Failed to configure OPA: %v", err)
+ log.Warnf("Failed to configure OPA: %v", err)
}
}
})
diff --git a/pkg/opasdk/opasdk_test.go b/pkg/opasdk/opasdk_test.go
index 3ed4be1..48ed65e 100644
--- a/pkg/opasdk/opasdk_test.go
+++ b/pkg/opasdk/opasdk_test.go
@@ -29,12 +29,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"io"
+ "net/http"
"net/http/httptest"
"os"
"policy-opa-pdp/consts"
"sync"
"testing"
- "net/http"
)
// Mock for os.Open
@@ -560,8 +560,8 @@ func TestListPolicies_GetPolicySuccess(t *testing.T) {
res := httptest.NewRecorder()
ListPolicies(res, req)
-// assert.Nil(t, err)
-// assert.NoError(t, err)
+ // assert.Nil(t, err)
+ // assert.NoError(t, err)
mockMemStore.AssertExpectations(t)
}
@@ -579,7 +579,7 @@ func TestListPolicies_Success(t *testing.T) {
res := httptest.NewRecorder()
ListPolicies(res, req)
-// assert.NoError(t, err)
+ // assert.NoError(t, err)
mockMemStore.AssertExpectations(t)
}
diff --git a/pkg/pdpattributes/pdpattributes.go b/pkg/pdpattributes/pdpattributes.go
index 005dd03..a40a55a 100644
--- a/pkg/pdpattributes/pdpattributes.go
+++ b/pkg/pdpattributes/pdpattributes.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -22,8 +22,8 @@
package pdpattributes
import (
- "github.com/google/uuid"
"policy-opa-pdp/pkg/log"
+ "github.com/google/uuid"
)
var (
@@ -48,7 +48,7 @@ func SetPdpSubgroup(pdpsubgroup string) {
}
// Retrieves the current PDP subgroup value.
-func getPdpSubgroup() string {
+func GetPdpSubgroup() string {
return PdpSubgroup
}
diff --git a/pkg/pdpattributes/pdpattributes_test.go b/pkg/pdpattributes/pdpattributes_test.go
index c9a41c7..700a264 100644
--- a/pkg/pdpattributes/pdpattributes_test.go
+++ b/pkg/pdpattributes/pdpattributes_test.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -46,14 +46,14 @@ func TestSetPdpSubgroup_Success(t *testing.T) {
t.Run("ValidSubgroup", func(t *testing.T) {
expectedSubgroup := "subgroup1"
SetPdpSubgroup(expectedSubgroup)
- assert.Equal(t, expectedSubgroup, getPdpSubgroup(), "Expected PDP subgroup to match set value")
+ assert.Equal(t, expectedSubgroup, GetPdpSubgroup(), "Expected PDP subgroup to match set value")
})
}
func TestSetPdpSubgroup_Failure(t *testing.T) {
t.Run("EmptySubgroup", func(t *testing.T) {
SetPdpSubgroup("")
- assert.Equal(t, "", getPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string")
+ assert.Equal(t, "", GetPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string")
})
t.Run("LargeSubgroup", func(t *testing.T) {
@@ -62,7 +62,7 @@ func TestSetPdpSubgroup_Failure(t *testing.T) {
largeSubgroup[i] = 'a'
}
SetPdpSubgroup(string(largeSubgroup))
- assert.Equal(t, string(largeSubgroup), getPdpSubgroup(), "Expected large PDP subgroup to match set value")
+ assert.Equal(t, string(largeSubgroup), GetPdpSubgroup(), "Expected large PDP subgroup to match set value")
})
}
diff --git a/pkg/utils/sort.go b/pkg/utils/sort.go
new file mode 100644
index 0000000..d59ea2a
--- /dev/null
+++ b/pkg/utils/sort.go
@@ -0,0 +1,41 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2025: Deutsche Telekom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
+// ========================LICENSE_END===================================
+
+package utils
+
+import (
+ "strings"
+)
+
+// Custom type for sorting
+type ByDotCount struct {
+ Keys []string
+ Ascend bool
+}
+
+// Implement sort.Interface for ByDotCount
+func (a ByDotCount) Len() int { return len(a.Keys) }
+
+func (a ByDotCount) Swap(i, j int) { a.Keys[i], a.Keys[j] = a.Keys[j], a.Keys[i] }
+
+func (a ByDotCount) Less(i, j int) bool {
+ if a.Ascend {
+ return strings.Count(a.Keys[i], ".") < strings.Count(a.Keys[j], ".")
+ }
+ return strings.Count(a.Keys[i], ".") > strings.Count(a.Keys[j], ".")
+}
diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go
index 23f9cfe..fba1fb1 100644
--- a/pkg/utils/utils.go
+++ b/pkg/utils/utils.go
@@ -16,7 +16,7 @@
// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
-// Package utils provides common functionalities
+// Package provides common functionalities
package utils
@@ -30,6 +30,7 @@ import (
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/model/oapicodegen"
"regexp"
"strings"
"time"
@@ -41,6 +42,7 @@ type (
var (
CreateDirectoryVar CreateDirectoryFunc = CreateDirectory
+ removeAll = os.RemoveAll
)
// validates if the given request is in valid uuid form
@@ -63,42 +65,60 @@ func CreateDirectory(dirPath string) error {
// Helper function to check and remove a directory
func RemoveDirectory(dirPath string) error {
- entries, err := os.ReadDir(dirPath)
+ fileDirPath := filepath.Clean(dirPath)
+ err := removeAll(fileDirPath)
if err != nil {
if os.IsNotExist(err) {
- log.Warnf("Directory does not exist: %s", dirPath)
+ log.Warnf("Directory does not exist: %s", fileDirPath)
// Directory does not exist, nothing to do
return nil
}
- return fmt.Errorf("failed to read directory: %w", err)
+ return fmt.Errorf("failed to remove file: %s, error: %w", fileDirPath, err)
+
}
- for _, entry := range entries {
- entryPath := filepath.Join(dirPath, entry.Name())
+ // Create a loop to check parent directories.
+ // Move to the parent directory
+ currentPath := filepath.Clean(filepath.Dir(dirPath))
+ for {
+ // Check if we have reached the match path
+ if currentPath == filepath.Clean(consts.DataNode) || currentPath == filepath.Clean(consts.Policies) {
+ return nil // Stop if we reach the match path
+ }
- if entry.IsDir() {
- // Check if the subdirectory is empty and delete it
- isEmpty, err := isDirEmpty(entryPath)
- if err != nil {
- return err
- }
- if isEmpty {
- log.Debugf("Removing empty subdirectory: %s", entryPath)
- if err := os.RemoveAll(entryPath); err != nil {
- return fmt.Errorf("failed to remove directory: %s, error: %w", entryPath, err)
- }
- }
- } else {
- // Delete specific files in the parent directory
- if entry.Name() == "data.json" || entry.Name() == "policy.rego" {
- log.Debugf("Removing file: %s", entryPath)
- if err := os.Remove(entryPath); err != nil {
- return fmt.Errorf("failed to remove file: %s, error: %w", entryPath, err)
- }
- }
+ if currentPath == "/" || currentPath == "." {
+ log.Infof("Reached root orelative path: %s", currentPath)
+ return nil // Stop if we reach the match path
+ }
+ log.Infof("Processig Parent dir : %s", currentPath)
+ // Check if the parent directory exists before proceeding
+ if _, err := os.Stat(currentPath); os.IsNotExist(err) {
+ log.Debugf("directory does not exist: %s. Stopping iteration.", currentPath)
+ return nil // Stop if we can't find the parent path
+ }
+ // Clean the parent directory
+ err = isSubDirEmpty(currentPath)
+ if err != nil {
+ return err
}
+
+ // Move to the parent directory
+ currentPath = filepath.Dir(currentPath)
}
+}
+func isSubDirEmpty(entryPath string) error {
+
+ isEmpty, err := isDirEmpty(entryPath)
+ if err != nil {
+ return err
+ }
+ if isEmpty {
+ log.Debugf("Removing empty subdirectory: %s", entryPath)
+ if err := removeAll(entryPath); err != nil {
+ return fmt.Errorf("failed to remove directory: %s, error: %w", entryPath, err)
+ }
+ }
return nil
}
@@ -144,7 +164,7 @@ func ValidateToscaPolicyJsonFields(policy model.ToscaPolicy) error {
return fmt.Errorf("duplicate data key '%s' found, '%s'", key, emphasize)
}
keySeen[key] = true
- if !strings.HasPrefix(key, "node." + policy.Name) {
+ if !strings.HasPrefix(key, "node."+policy.Name) {
return fmt.Errorf("data key '%s' does not have name node.'%s' as a prefix, '%s'", key, policy.Name, emphasize)
}
}
@@ -289,24 +309,27 @@ func IsValidCurrentTime(currentTime *string) bool {
}
// Custom validation function for *string type eg: OnapComponent, OnapInstance, OnapName, PolicyName
-func IsValidString(name *string) bool {
- if name == nil || strings.TrimSpace(*name) == "" {
- return false
- } else {
- return true
+func IsValidString(name interface{}) bool {
+ switch v := name.(type) {
+ case *string:
+ return v != nil && strings.TrimSpace(*v) != ""
+ case string:
+ return strings.TrimSpace(v) != ""
+ default:
+ return false // Handles cases where name is neither a string nor a *string
}
}
func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) (string, error) {
- cmd := cmdFunc(
- consts.Opa,
- consts.BuildBundle,
- consts.V1_COMPATIBLE,
- consts.Policies,
- consts.Data,
- consts.Output,
- consts.BundleTarGzFile,
- )
+ cmd := cmdFunc(
+ consts.Opa,
+ consts.BuildBundle,
+ consts.V1Compatible,
+ consts.Policies,
+ consts.Data,
+ consts.Output,
+ consts.BundleTarGzFile,
+ )
log.Debugf("Before calling combinedoutput")
output, err := cmd.CombinedOutput()
@@ -318,3 +341,113 @@ func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) (string, error) {
log.Debug("Bundle Built Sucessfully....")
return string(output), nil
}
+
+// Validation function
+func ValidateOPADataRequest(request interface{}) []string {
+ var validationErrors []string
+ if updateRequest, ok := request.(*oapicodegen.OPADataUpdateRequest); ok {
+ if updateRequest == nil { // Check if updateRequest is nil
+ validationErrors = append(validationErrors, "OPADataUpdateRequest is nil")
+ return validationErrors // Return if the request is nil
+ }
+ // Check if required fields are populated
+ if updateRequest.CurrentDate != nil {
+ dateString := updateRequest.CurrentDate.String()
+ if !IsValidCurrentDate(&dateString) {
+ validationErrors = append(validationErrors, "CurrentDate is invalid")
+ }
+ } else {
+ validationErrors = append(validationErrors, "CurrentDate is required")
+ }
+
+ // Validate CurrentDateTime format
+ if !(IsValidTime(updateRequest.CurrentDateTime)) {
+ validationErrors = append(validationErrors, "CurrentDateTime is invalid or missing")
+ }
+
+ // Validate CurrentTime format
+ if !(IsValidCurrentTime(updateRequest.CurrentTime)) {
+ validationErrors = append(validationErrors, "CurrentTime is invalid or missing")
+ }
+
+ // Validate TimeOffset format (e.g., +02:00 or -05:00)
+ if !(IsValidTimeOffset(updateRequest.TimeOffset)) {
+ validationErrors = append(validationErrors, "TimeOffset is invalid or missing")
+ }
+
+ // Validate TimeZone format (e.g., 'America/New_York')
+ if !(IsValidTimeZone(updateRequest.TimeZone)) {
+ validationErrors = append(validationErrors, "TimeZone is invalid or missing")
+ }
+
+ // Optionally, check if 'OnapComponent', 'OnapInstance', 'OnapName', and 'PolicyName' are provided
+ if !(IsValidString(updateRequest.OnapComponent)) {
+ validationErrors = append(validationErrors, "OnapComponent is required")
+ }
+
+ if !(IsValidString(updateRequest.OnapInstance)) {
+ validationErrors = append(validationErrors, "OnapInstance is required")
+ }
+
+ if !(IsValidString(updateRequest.OnapName)) {
+ validationErrors = append(validationErrors, "OnapName is required")
+ }
+
+ if !(IsValidString(updateRequest.PolicyName)) {
+ validationErrors = append(validationErrors, "PolicyName is required and cannot be empty")
+ }
+ }
+
+ if decisionRequest, ok := request.(*oapicodegen.OPADecisionRequest); ok {
+
+ if decisionRequest == nil { // Check if decisionRequest is nil
+ validationErrors = append(validationErrors, "OPADecisionRequest is nil")
+ return validationErrors // Return if the request is nil
+ }
+ // Check if required fields are populated
+ if decisionRequest.CurrentDate != nil {
+ dateString := decisionRequest.CurrentDate.String()
+ if !IsValidCurrentDate(&dateString) {
+ validationErrors = append(validationErrors, "CurrentDate is invalid")
+ }
+ }
+
+ // Validate CurrentDateTime format
+ if (decisionRequest.CurrentDateTime != nil) && !(IsValidTime(decisionRequest.CurrentDateTime)) {
+ validationErrors = append(validationErrors, "CurrentDateTime is invalid or missing")
+ }
+
+ // Validate CurrentTime format
+ if (decisionRequest.CurrentTime != nil) && !(IsValidCurrentTime(decisionRequest.CurrentTime)) {
+ validationErrors = append(validationErrors, "CurrentTime is invalid or missing")
+ }
+
+ // Validate TimeOffset format (e.g., +02:00 or -05:00)
+ if (decisionRequest.TimeOffset != nil) && !(IsValidTimeOffset(decisionRequest.TimeOffset)) {
+ validationErrors = append(validationErrors, "TimeOffset is invalid or missing")
+ }
+
+ // Validate TimeZone format (e.g., 'America/New_York')
+ if (decisionRequest.TimeZone != nil) && !(IsValidTimeZone(decisionRequest.TimeZone)) {
+ validationErrors = append(validationErrors, "TimeZone is invalid or missing")
+ }
+
+ // Optionally, check if 'OnapComponent', 'OnapInstance', 'OnapName', and 'PolicyName' are provided
+ if (decisionRequest.OnapComponent != nil) && !(IsValidString(decisionRequest.OnapComponent)) {
+ validationErrors = append(validationErrors, "OnapComponent is required")
+ }
+
+ if (decisionRequest.OnapInstance != nil) && !(IsValidString(decisionRequest.OnapInstance)) {
+ validationErrors = append(validationErrors, "OnapInstance is required")
+ }
+
+ if (decisionRequest.OnapName != nil) && !(IsValidString(decisionRequest.OnapName)) {
+ validationErrors = append(validationErrors, "OnapName is required")
+ }
+
+ if !(IsValidString(decisionRequest.PolicyName)) {
+ validationErrors = append(validationErrors, "PolicyName is required and cannot be empty")
+ }
+ }
+ return validationErrors
+}
diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go
index 3a4948d..a76a435 100644
--- a/pkg/utils/utils_test.go
+++ b/pkg/utils/utils_test.go
@@ -19,12 +19,16 @@
package utils
import (
+ "fmt"
"github.com/google/uuid"
+ openapi_types "github.com/oapi-codegen/runtime/types"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"os"
"os/exec"
"path/filepath"
"policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/model/oapicodegen"
"testing"
"time"
)
@@ -117,16 +121,12 @@ func TestRemoveDirectory_Positive(t *testing.T) {
_, err = os.Stat(filePath)
assert.True(t, os.IsNotExist(err), "Fle should be removed")
- _, err = os.Stat(tempDir)
- assert.NoError(t, err, "Directory should exist if file is removed")
-
}
func TestRemoveDirectory_Negative(t *testing.T) {
nonExistentDirectory := filepath.Join(os.TempDir(), "non_existent_directory")
_, err := os.Stat(nonExistentDirectory)
- assert.True(t, os.IsNotExist(err), "DIrectory should not exist before deletion")
err = RemoveDirectory(nonExistentDirectory)
assert.NoError(t, err)
}
@@ -145,8 +145,6 @@ func TestRemoveDirectory_ValidEmptyDir(t *testing.T) {
_, err = os.Stat(subDir)
assert.True(t, os.IsNotExist(err), "Expected directory to be deleted")
- _, err = os.Stat(tempDir)
- assert.NoError(t, err, "Directory should exist if file is removed")
}
// Test removing a directory that does not exist
@@ -155,17 +153,6 @@ func TestRemoveDirectory_NonExistent(t *testing.T) {
assert.NoError(t, err, "Expected no error when removing a non-existent directory")
}
-// Test failure scenario where ReadDir fails
-func TestRemoveDirectory_ReadDirFailure(t *testing.T) {
- // Create a file instead of a directory
- tempFile, err := os.CreateTemp("", "testfile")
- assert.NoError(t, err)
- defer os.Remove(tempFile.Name())
-
- err = RemoveDirectory(tempFile.Name()) // Should fail because it's a file, not a directory
- assert.Error(t, err, "Expected an error when trying to remove a file as a directory")
-}
-
// Test removing a directory containing only data.json and policy.rego
func TestRemoveDirectory_WithSpecificFiles(t *testing.T) {
tempDir, err := os.MkdirTemp("", "testdir")
@@ -626,3 +613,131 @@ func TestBuildBundle_CommandFailure(t *testing.T) {
t.Errorf("BuildBundle() error = nil, wantErr %v", output)
}
}
+
+// Test function for isSubDirEmpty using real directories
+func TestIsSubDirEmpty(t *testing.T) {
+ // Create a temporary directory for testing
+ t.Run("Empty Directory - Should be removed", func(t *testing.T) {
+ tempDir, err := os.MkdirTemp("", "emptyDir")
+ require.NoError(t, err)
+
+ // Call the function
+ err = isSubDirEmpty(tempDir)
+
+ // Assert no error and directory should be removed
+ assert.NoError(t, err)
+ _, err = os.Stat(tempDir)
+ assert.True(t, os.IsNotExist(err)) // Directory should be gone
+ })
+
+ t.Run("Non-Empty Directory - Should not be removed", func(t *testing.T) {
+ tempDir, err := os.MkdirTemp("", "nonEmptyDir")
+ require.NoError(t, err)
+
+ // Create a file inside to make the directory non-empty
+ _, err = os.CreateTemp(tempDir, "file")
+ require.NoError(t, err)
+
+ // Call the function
+ err = isSubDirEmpty(tempDir)
+
+ // Assert directory still exists
+ assert.NoError(t, err)
+ _, err = os.Stat(tempDir)
+ assert.NoError(t, err) // Directory should still exist
+
+ // Clean up
+ os.RemoveAll(tempDir)
+ })
+
+ t.Run("Non-Existent Directory - Should return an error", func(t *testing.T) {
+ tempDir := "/path/that/does/not/exist"
+
+ err := isSubDirEmpty(tempDir)
+
+ // Assert error
+ assert.Error(t, err)
+ // assert.True(t, os.IsNotExist(err))
+ })
+
+ t.Run("Error Removing Directory - Should return an error", func(t *testing.T) {
+ // Create a temporary directory
+ tempDir, err := os.MkdirTemp("", "errorDir")
+ require.NoError(t, err)
+
+ // Mock removeAll to return an error
+ originalRemoveAll := removeAll
+ defer func() { removeAll = originalRemoveAll }() // Restore after test
+
+ removeAll = func(path string) error {
+ return fmt.Errorf("failed to remove directory: %s", path)
+ }
+
+ err = isSubDirEmpty(tempDir)
+
+ // Assert error
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "failed to remove directory")
+
+ // Clean up
+ os.RemoveAll(tempDir)
+ })
+}
+
+func TestValidateOPADataRequest(t *testing.T) {
+ ctime := "08:26:41.857Z"
+ onapComp := "COMPONENT"
+ onapIns := "INSTANCE"
+ onapName := "ONAP"
+ policyName := "s3"
+ parsedDate, err := time.Parse("2006-01-02", "2024-02-12")
+ if err != nil {
+ fmt.Println("error in parsedDate")
+ }
+ currentDate := openapi_types.Date{Time: parsedDate}
+ currentDateTime, err := time.Parse(time.RFC3339, "2024-02-12T12:00:00Z")
+ if err != nil {
+ fmt.Println("error in currentDateTime")
+ }
+
+ inValidDecisionRequest := &oapicodegen.OPADecisionRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ }
+
+ var data []map[string]interface{}
+
+ data = nil
+
+ inValidRequest := &oapicodegen.OPADataUpdateRequest{
+ CurrentDate: &currentDate,
+ CurrentDateTime: &currentDateTime,
+ CurrentTime: &ctime,
+ OnapComponent: &onapComp,
+ OnapInstance: &onapIns,
+ OnapName: &onapName,
+ PolicyName: &policyName,
+ Data: &data,
+ }
+
+ inValidErr := []string{"TimeOffset is invalid or missing", "TimeZone is invalid or missing"}
+
+ inValidDecisionErrs := []string{"PolicyName is required and cannot be empty"}
+ tests := []struct {
+ name string
+ request interface{}
+ expectedErr []string
+ }{
+ {"Valid Request", inValidRequest, inValidErr},
+ {"Invalid OPADecisionRequest", inValidDecisionRequest, inValidDecisionErrs},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ errors := ValidateOPADataRequest(tt.request)
+ fmt.Printf("error : %s", errors)
+ fmt.Printf("error len : %d", len(errors))
+ assert.Equal(t, tt.expectedErr, errors)
+ })
+ }
+}