aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMurali Parthasarathy K <muraliparthasarathy.k@techmahindra.com>2025-02-12 12:35:10 +0100
committerMurali Parthasarathy K <muraliparthasarathy.k@techmahindra.com>2025-02-14 13:52:21 +0100
commit358eb80b992050e3749834b6211edb6426325020 (patch)
tree069d4720a47dbcd646afa0b723d0b8595812d3d2
parentc98f9a1086c1ede4b6ccb6a566197533b3001826 (diff)
Design and Develop run time policy updates to OPA-PDP
Issue-ID: POLICY-5216 Change-Id: I2a1466f74106bbab7869dd82f29badd157b980bb Signed-off-by: Murali Parthasarathy K <muraliparthasarathy.k@techmahindra.com>
-rw-r--r--Dockerfile6
-rw-r--r--README.md30
-rw-r--r--api/register-handlers.go6
-rw-r--r--build/Dockerfile77
-rw-r--r--cmd/opa-pdp/opa-pdp.go10
-rw-r--r--cmd/opa-pdp/opa-pdp_test.go599
-rw-r--r--pkg/bundleserver/bundle-server.go8
-rw-r--r--pkg/bundleserver/bundle-server_test.go10
-rw-r--r--pkg/decision/decision-provider.go156
-rw-r--r--pkg/decision/decision-provider_test.go154
-rw-r--r--pkg/kafkacomm/handler/pdp_message_handler.go6
-rw-r--r--pkg/kafkacomm/handler/pdp_state_change_handler.go4
-rw-r--r--pkg/kafkacomm/handler/pdp_state_change_handler_test.go2
-rw-r--r--pkg/kafkacomm/handler/pdp_update_deploy_policy.go390
-rw-r--r--pkg/kafkacomm/handler/pdp_update_message_handler.go143
-rw-r--r--pkg/kafkacomm/handler/pdp_update_message_handler_test.go74
-rw-r--r--pkg/kafkacomm/handler/pdp_update_undeploy_policy.go196
-rw-r--r--pkg/kafkacomm/pdp_topic_consumer.go5
-rw-r--r--pkg/kafkacomm/pdp_topic_consumer_test.go54
-rw-r--r--pkg/kafkacomm/publisher/pdp-heartbeat.go24
-rw-r--r--pkg/kafkacomm/publisher/pdp-heartbeat_test.go3
-rw-r--r--pkg/kafkacomm/publisher/pdp-pap-registration.go1
-rw-r--r--pkg/kafkacomm/publisher/pdp-pap-registration_test.go7
-rw-r--r--pkg/kafkacomm/publisher/pdp-status-publisher.go66
-rw-r--r--pkg/kafkacomm/publisher/pdp-status-publisher_test.go4
-rw-r--r--pkg/metrics/counters.go94
-rw-r--r--pkg/metrics/counters_test.go4
-rw-r--r--pkg/metrics/statistics-provider.go16
-rw-r--r--pkg/model/mesages.go8
-rw-r--r--pkg/model/messages_test.go50
-rw-r--r--pkg/model/toscaconceptidentifier.go4
-rw-r--r--pkg/model/toscapolicy.go43
-rw-r--r--pkg/opasdk/opasdk.go160
-rw-r--r--pkg/opasdk/opasdk_test.go150
-rw-r--r--pkg/pdpattributes/pdpattributes.go8
-rw-r--r--pkg/pdpattributes/pdpattributes_test.go18
-rw-r--r--pkg/policymap/policy_and_data_map.go204
-rw-r--r--pkg/utils/utils.go187
-rw-r--r--test/README.md14
-rw-r--r--test/docker-compose.yml7
-rw-r--r--test/test_resources/deploy.json10
-rw-r--r--test/test_resources/deploy_collab.json10
-rw-r--r--test/test_resources/deploy_conflict.json10
-rw-r--r--test/test_resources/deploy_zone.json11
-rw-r--r--test/test_resources/policy_collab.yaml18
-rw-r--r--test/test_resources/policy_conflict.yaml15
-rw-r--r--test/test_resources/policy_deploy_single_policy.yaml17
-rw-r--r--test/test_resources/policy_zone.yaml16
-rw-r--r--test/test_resources/undeploy_batch_delete.json23
49 files changed, 2460 insertions, 672 deletions
diff --git a/Dockerfile b/Dockerfile
index ecd5c49..5312d06 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,6 +1,6 @@
# -
# ========================LICENSE_START=================================
-# Copyright (C) 2024: Deutsche Telekom
+# Copyright (C) 2024-2025: Deutsche Telekom
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -64,6 +64,10 @@ RUN mkdir /app
COPY --from=compile /app /app
RUN chmod +x /app/opa-pdp
+RUN mkdir /opt/policies
+RUN mkdir /opt/data
+
+
# Copy our opa executable from build stage
COPY --from=build /tmp/opa /app/opa
RUN chmod 755 /app/opa
diff --git a/README.md b/README.md
index 1254546..f9c7a2c 100644
--- a/README.md
+++ b/README.md
@@ -8,13 +8,13 @@ docker build -f ./build/Dockerfile -t opa-pdp:1.0.0 .
1. docker image ls | grep opa-pdp
2. inside test directory run - docker-compose down
-
+
3. docker-compose up -d
4. docker logs -f opa-pdp
## Generating models with openapi.yaml
-
+
1. oapi-codegen -package=oapicodegen -generate "models" openapi.yaml > models.go
## Creating new Policy
@@ -23,13 +23,13 @@ docker build -f ./build/Dockerfile -t opa-pdp:1.0.0 .
2. Inside this directory create a policy [i.e; rego file] named policy.rego. Version 1 i.e v1 is supported for rego files.
-3. For contents you can see example of policy.rego under test/policies/role/policy.rego.
+3. For contents you can see example of policy.rego under test/policies/role/policy.rego.
3. Inside test/policies/data create a new directory with the package name of policy.rego. For example test/policies/data/role
4. Create a file data.json under the newly created directory inside data. For example test/policies/data/data.json
-5. In policy.rego the package declaration organizes the policy rules. This allows
+5. In policy.rego the package declaration organizes the policy rules. This allows
6. The Rule allow evaluates to true/false based on the logic defined in policy.rego
@@ -39,10 +39,28 @@ docker build -f ./build/Dockerfile -t opa-pdp:1.0.0 .
9. To deploy a new policy opa-pdp need to be redpolyed i.e; docker-compose down and up need to be executed.
+## Deploying New Policy
+
+1. Create a tosca policy file that has policy.rego and data.json encoded contents.
+
+2. For example refer to test/policy_deployment.yaml.
+
+3. OPA emphasizes that each policy should have a unique policy-name/policy-id,
+
+ example:
+ Not Allowed --> when policy with name onap.org.cell is deployed and when onap.org.cell.consistency not allowed for deployment since it carries the same hierarchy.
+ Allowed --> Policy with name onap.org.cell is deployed and when onap.org.consistency is allowed for deployment since it does not have the same hierarchy.
+
+
+4. Policy and data key should start (prefixed) with policy-id. For ex refer totest/testresources/policy_deploy_single_policy.yaml.
+
+5. Create a deploy.json file to deploy through pap. Refer to file under test/testresources/deploy.json.
+
+
## Testing Decision Api
-send json
-{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22 12:08:00.123456+0000 ", "policyName":"role/allow","input":{"user":"alice","action":"write","object":"id123","type":"dog"}}
+send json
+{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22 12:08:00.123456+0000 ", "policyName":"role/allow","input":{"user":"alice","action":"write","object":"id123","type":"dog"}}
to opa-pdp as shown in curl commands below.
"policyName":"[packagename in rego file]/allow"
diff --git a/api/register-handlers.go b/api/register-handlers.go
index c5eb5df..0504e48 100644
--- a/api/register-handlers.go
+++ b/api/register-handlers.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@ import (
"policy-opa-pdp/pkg/decision"
"policy-opa-pdp/pkg/healthcheck"
"policy-opa-pdp/pkg/metrics"
+ "policy-opa-pdp/pkg/opasdk"
)
// RegisterHandlers registers the HTTP handlers for the service.
@@ -53,6 +54,9 @@ func RegisterHandlers() {
statisticsReportHandler := http.HandlerFunc(metrics.FetchCurrentStatistics)
http.HandleFunc("/policy/pdpo/v1/statistics", basicAuth(statisticsReportHandler))
+ listPoliciesHandler := http.HandlerFunc(opasdk.ListPolicies)
+ http.Handle("/opa/listpolicies", listPoliciesHandler)
+
}
// handles authentication
diff --git a/build/Dockerfile b/build/Dockerfile
deleted file mode 100644
index 84359c3..0000000
--- a/build/Dockerfile
+++ /dev/null
@@ -1,77 +0,0 @@
-# -
-# ========================LICENSE_START=================================
-# Copyright (C) 2024: Deutsche Telekom
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# SPDX-License-Identifier: Apache-2.0
-# ========================LICENSE_END===================================
-#
-
-FROM curlimages/curl:7.78.0 AS build
-
-# Get OPA
-RUN curl -Lo /tmp/opa https://github.com/open-policy-agent/opa/releases/download/v0.69.0/opa_linux_amd64
-
-FROM golang:1.23 AS compile
-
-RUN mkdir /app
-
-COPY ../go.mod ../go.sum /app/
-
-COPY . .
-
-RUN mkdir /app/cfg
-ADD ../cfg /app/cfg
-
-RUN mkdir /app/consts
-ADD ../consts /app/consts
-
-RUN mkdir /app/api
-ADD ../api /app/api
-
-RUN mkdir /app/cmd
-ADD ../cmd /app/cmd
-
-RUN mkdir /app/pkg
-ADD ../pkg /app/pkg
-
-RUN mkdir /app/bundles
-
-WORKDIR /app
-
-# Build the binary
-RUN GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o /app/opa-pdp /app/cmd/opa-pdp/opa-pdp.go
-#COPY config.json /app/config.json
-#RUN chmod 644 /app/config.json
-
-FROM ubuntu
-
-RUN apt-get update && apt-get install -y netcat-openbsd && rm -rf /var/lib/apt/lists/*
-
-RUN apt-get update && apt-get install -y curl
-
-# Copy our static executable from compile stage
-RUN mkdir /app
-COPY --from=compile /app /app
-RUN chmod +x /app/opa-pdp
-
-# Copy our opa executable from build stage
-COPY --from=build /tmp/opa /app/opa
-RUN chmod 755 /app/opa
-
-WORKDIR /app
-EXPOSE 8282
-
-# Command to run OPA with the policies
-CMD ["/app/opa-pdp"]
-
diff --git a/cmd/opa-pdp/opa-pdp.go b/cmd/opa-pdp/opa-pdp.go
index a2cbde2..ccc9f5d 100644
--- a/cmd/opa-pdp/opa-pdp.go
+++ b/cmd/opa-pdp/opa-pdp.go
@@ -68,7 +68,8 @@ func main() {
// Initialize Handlers and Build Bundle
initializeHandlersFunc()
- if err := initializeBundleFunc(exec.Command); err != nil {
+ if output, err := initializeBundleFunc(exec.Command); err != nil {
+ log.Warnf("Output %s", string(output))
log.Warnf("Failed to initialize bundle: %s", err)
}
@@ -95,11 +96,11 @@ func main() {
defer producer.Close()
sender := &publisher.RealPdpStatusSender{Producer: producer}
+
// start pdp message handler in a seperate routine
handleMessagesFunc(ctx, kc, sender)
-
+
time.Sleep(10 * time.Second)
-
// pdp registration
isRegistered := registerPDPFunc(sender)
if !isRegistered {
@@ -141,7 +142,7 @@ func initializeHandlers() {
}
// build bundle tar file
-func initializeBundle(execCmd func(string, ...string) *exec.Cmd) error {
+func initializeBundle(execCmd func(string, ...string) *exec.Cmd) (string, error) {
return bundleserver.BuildBundle(execCmd)
}
@@ -228,3 +229,4 @@ myLoop:
time.Sleep(time.Duration(consts.SHUTDOWN_WAIT_TIME) * time.Second)
}
+
diff --git a/cmd/opa-pdp/opa-pdp_test.go b/cmd/opa-pdp/opa-pdp_test.go
index 21d9154..87fb2de 100644
--- a/cmd/opa-pdp/opa-pdp_test.go
+++ b/cmd/opa-pdp/opa-pdp_test.go
@@ -21,26 +21,26 @@ package main
import (
"context"
+ "errors"
+ "fmt"
"net/http"
"os"
"os/exec"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/kafkacomm"
+ "policy-opa-pdp/pkg/kafkacomm/handler"
"policy-opa-pdp/pkg/kafkacomm/mocks"
"policy-opa-pdp/pkg/kafkacomm/publisher"
- "policy-opa-pdp/pkg/kafkacomm/handler"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/model"
- "fmt"
+ "reflect"
"testing"
"time"
- "errors"
- "reflect"
- "bou.ke/monkey"
+ "bou.ke/monkey"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
- "github.com/confluentinc/confluent-kafka-go/kafka"
)
// Mock objects and functions
@@ -57,8 +57,8 @@ func (m *MockKafkaConsumerInterface) Close() {
}
func (m *MockKafkaConsumerInterface) ReadMessage(kc *kafkacomm.KafkaConsumer) ([]byte, error) {
- args := m.Called(kc)
- return args.Get(0).([]byte), args.Error(0)
+ args := m.Called(kc)
+ return args.Get(0).([]byte), args.Error(0)
}
type MockPdpStatusSender struct {
@@ -71,11 +71,10 @@ func (m *MockPdpStatusSender) SendRegistration() error {
}
func (m *MockPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error {
- args := m.Called(pdpStatus)
- return args.Error(0)
+ args := m.Called(pdpStatus)
+ return args.Error(0)
}
-
type MockServer struct {
*http.Server
mock.Mock
@@ -129,8 +128,8 @@ func TestMainFunction(t *testing.T) {
}
// Mock initializeBundle
- initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) error {
- return nil // no error expected
+ initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) (string, error) {
+ return "", nil // no error expected
}
// Use an actual *http.Server instance for testing
@@ -207,8 +206,8 @@ func TestInitializeBundle(t *testing.T) {
mockExecCmd := func(name string, arg ...string) *exec.Cmd {
return exec.Command("echo")
}
- err := initializeBundle(mockExecCmd)
- assert.NoError(t, err, "Expected no error from initializeBundle")
+ output,err := initializeBundle(mockExecCmd)
+ assert.NoError(t, err, output)
}
// Test to verify that the HTTP server starts successfully.
@@ -226,415 +225,411 @@ func TestInitializeOPA(t *testing.T) {
// Test to ensure the application correctly waits for the server to be ready.
func TestWaitForServer(t *testing.T) {
- waitForServerFunc = func() {
- time.Sleep(50 * time.Millisecond)
- }
+ waitForServerFunc = func() {
+ time.Sleep(50 * time.Millisecond)
+ }
- waitForServer()
+ waitForServer()
}
// TestInitializeHandlers
func TestInitializeHandlers(t *testing.T) {
- initializeHandlersFunc = func() {
- log.Debug("Handlers initialized")
- }
+ initializeHandlersFunc = func() {
+ log.Debug("Handlers initialized")
+ }
- initializeHandlers()
+ initializeHandlers()
}
// Test to simulate the successful registration of a PDP
func TestRegisterPDP_Success(t *testing.T) {
- mockSender := new(MockPdpStatusSender)
- mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+ mockSender := new(MockPdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
- result := registerPDP(mockSender)
+ result := registerPDP(mockSender)
- assert.True(t, result)
- mockSender.AssertExpectations(t)
+ assert.True(t, result)
+ mockSender.AssertExpectations(t)
}
// Test to simulate a failure scenario during the registration of a PDP.
func TestRegisterPDP_Failure(t *testing.T) {
- mockSender := new(MockPdpStatusSender)
- mockSender.On("SendPdpStatus", mock.Anything).Return(assert.AnError)
+ mockSender := new(MockPdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(assert.AnError)
- result := registerPDP(mockSender)
+ result := registerPDP(mockSender)
- assert.False(t, result)
- mockSender.AssertExpectations(t)
+ assert.False(t, result)
+ mockSender.AssertExpectations(t)
}
// Test to verify that the HTTP Server starts successfully and can be shut down gracefully.
func TestStartAndShutDownHTTPServer(t *testing.T) {
- testServer := startHTTPServer()
+ testServer := startHTTPServer()
- time.Sleep(1 * time.Second)
+ time.Sleep(1 * time.Second)
- assert.NotNil(t, testServer, "Server should be initialized")
+ assert.NotNil(t, testServer, "Server should be initialized")
- go func() {
- err := testServer.ListenAndServe()
- assert.Error(t, err, "Server should not return error after starting and shutting down")
- }()
+ go func() {
+ err := testServer.ListenAndServe()
+ assert.Error(t, err, "Server should not return error after starting and shutting down")
+ }()
- shutdownHTTPServer(testServer)
+ shutdownHTTPServer(testServer)
}
func TestMainFunction_Failure(t *testing.T) {
- interruptChannel := make(chan os.Signal, 1)
- initializeOPAFunc = func() error {
- return errors.New("OPA initialization failed")
- }
+ interruptChannel := make(chan os.Signal, 1)
+ initializeOPAFunc = func() error {
+ return errors.New("OPA initialization failed")
+ }
- done := make(chan struct{})
- go func() {
- main()
- close(done)
- }()
+ done := make(chan struct{})
+ go func() {
+ main()
+ close(done)
+ }()
- interruptChannel <- os.Interrupt
+ interruptChannel <- os.Interrupt
- select {
- case <-done:
- case <-time.After(1 * time.Second):
- t.Error("main function timed out on failure scenario")
- }
+ select {
+ case <-done:
+ case <-time.After(1 * time.Second):
+ t.Error("main function timed out on failure scenario")
+ }
}
// Test to verify that the application handles errors during the shutdown process gracefully.
func TestHandleShutdown_ErrorScenario(t *testing.T) {
- mockConsumer := new(mocks.KafkaConsumerInterface)
- mockConsumer.On("Unsubscribe").Return(errors.New("unsubscribe error"))
- mockConsumer.On("Close").Return(errors.New("close error"))
- mockKafkaConsumer := &kafkacomm.KafkaConsumer{
- Consumer: mockConsumer,
- }
-
- interruptChannel := make(chan os.Signal, 1)
- _, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- go func() {
- time.Sleep(100 * time.Millisecond)
- interruptChannel <- os.Interrupt
- }()
-
- done := make(chan bool)
- go func() {
- handleShutdown(mockKafkaConsumer, interruptChannel, cancel)
- done <- true
- }()
-
- select {
- case <-done:
- mockConsumer.AssertCalled(t, "Unsubscribe")
- mockConsumer.AssertCalled(t, "Close")
- case <-time.After(1 * time.Second):
- t.Error("handleShutdown timed out")
- }
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockConsumer.On("Unsubscribe").Return(errors.New("unsubscribe error"))
+ mockConsumer.On("Close").Return(errors.New("close error"))
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ interruptChannel := make(chan os.Signal, 1)
+ _, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ interruptChannel <- os.Interrupt
+ }()
+
+ done := make(chan bool)
+ go func() {
+ handleShutdown(mockKafkaConsumer, interruptChannel, cancel)
+ done <- true
+ }()
+
+ select {
+ case <-done:
+ mockConsumer.AssertCalled(t, "Unsubscribe")
+ mockConsumer.AssertCalled(t, "Close")
+ case <-time.After(1 * time.Second):
+ t.Error("handleShutdown timed out")
+ }
}
// Test to simulate errors during the shutdown of the HTTP server.
-func TestShutdownHTTPServer_Error(t *testing.T) {
- mockServer := &MockServer{}
+func TestShutdownHTTPServer_Error(t *testing.T) {
+ mockServer := &MockServer{}
- mockServer.On("Shutdown").Return(errors.New("shutdown error"))
+ mockServer.On("Shutdown").Return(errors.New("shutdown error"))
- shutdownHTTPServerFunc := func(s *MockServer) {
- err := s.Shutdown()
- if err != nil {
- t.Logf("Expected error during shutdown: %v", err)
- }
- }
+ shutdownHTTPServerFunc := func(s *MockServer) {
+ err := s.Shutdown()
+ if err != nil {
+ t.Logf("Expected error during shutdown: %v", err)
+ }
+ }
- shutdownHTTPServerFunc(mockServer)
+ shutdownHTTPServerFunc(mockServer)
- mockServer.AssertExpectations(t)
+ mockServer.AssertExpectations(t)
}
// Test to validate the successful shutdown of the HTTP server.
func TestShutdownHTTPServerSucessful(t *testing.T) {
- t.Run("SuccessfulShutdown", func(t *testing.T) {
- mockServer := &MockServer{
- Server: &http.Server{},
- }
+ t.Run("SuccessfulShutdown", func(t *testing.T) {
+ mockServer := &MockServer{
+ Server: &http.Server{},
+ }
- mockServer.On("Shutdown").Return(nil)
+ mockServer.On("Shutdown").Return(nil)
- err := mockServer.Shutdown()
- if err != nil {
- t.Errorf("Expected no error, got: %v", err)
- }
-
- shutdownHTTPServer(mockServer.Server)
- mockServer.AssertExpectations(t)
- })
+ err := mockServer.Shutdown()
+ if err != nil {
+ t.Errorf("Expected no error, got: %v", err)
+ }
- t.Run("ShutdownWithError", func(t *testing.T) {
+ shutdownHTTPServer(mockServer.Server)
+ mockServer.AssertExpectations(t)
+ })
- mockServer := &MockServer{
- Server: &http.Server{},
+ t.Run("ShutdownWithError", func(t *testing.T) {
- }
+ mockServer := &MockServer{
+ Server: &http.Server{},
+ }
- mockServer.On("Shutdown").Return(errors.New("shutdown error"))
+ mockServer.On("Shutdown").Return(errors.New("shutdown error"))
- err := mockServer.Shutdown()
- if err == nil {
- t.Error("Expected an error, but got none")
- }
- shutdownHTTPServer(mockServer.Server)
- mockServer.AssertExpectations(t)
+ err := mockServer.Shutdown()
+ if err == nil {
+ t.Error("Expected an error, but got none")
+ }
+ shutdownHTTPServer(mockServer.Server)
+ mockServer.AssertExpectations(t)
- })
+ })
}
// TestHandleMessages
func TestHandleMessages(t *testing.T) {
- message := `{"MessageType": "PDP_UPDATE", "Data": "test-update"}`
- mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
- mockSender := &publisher.RealPdpStatusSender{}
- expectedError := error(nil)
-
- kafkaMsg := &kafka.Message{
- Value: []byte(message),
- }
- mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg,expectedError)
- mockConsumer := &kafkacomm.KafkaConsumer{
- Consumer: mockKafkaConsumer,
- }
+ message := `{"MessageType": "PDP_UPDATE", "Data": "test-update"}`
+ mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
+ mockSender := &publisher.RealPdpStatusSender{}
+ expectedError := error(nil)
+ kafkaMsg := &kafka.Message{
+ Value: []byte(message),
+ }
+ mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+ mockConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockKafkaConsumer,
+ }
- ctx := context.Background()
- handleMessages(ctx, mockConsumer, mockSender)
+ ctx := context.Background()
+ handleMessages(ctx, mockConsumer, mockSender)
}
// Test to simulate a failure during OPA bundle initialization in the main function.
func TestMain_InitializeBundleFailure(t *testing.T) {
- initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) error {
- return errors.New("bundle initialization error") // Simulate error
- }
+ initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) (string, error) {
+ return "Bundle Initialization Error", errors.New("bundle initialization error") // Simulate error
+ }
- done := make(chan struct{})
- go func() {
- main()
- close(done)
- }()
+ done := make(chan struct{})
+ go func() {
+ main()
+ close(done)
+ }()
- select {
- case <-done:
- case <-time.After(1 * time.Second):
- t.Error("main function timed out on initializeBundleFunc failure")
- }
+ select {
+ case <-done:
+ case <-time.After(1 * time.Second):
+ t.Error("main function timed out on initializeBundleFunc failure")
+ }
}
// Test to simulate a Kafka initialization failure in the main function.
func TestMain_KafkaInitializationFailure(t *testing.T) {
- startKafkaConsAndProdFunc = func() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) {
- return nil, nil, errors.New("kafka initialization failed")
- }
+ startKafkaConsAndProdFunc = func() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) {
+ return nil, nil, errors.New("kafka initialization failed")
+ }
- done := make(chan struct{})
- go func() {
- main()
- close(done)
- }()
+ done := make(chan struct{})
+ go func() {
+ main()
+ close(done)
+ }()
- select {
- case <-done:
- // Verify if the Kafka failure path is executed
- case <-time.After(1 * time.Second):
- t.Error("main function timed out on Kafka initialization failure")
- }
+ select {
+ case <-done:
+ // Verify if the Kafka failure path is executed
+ case <-time.After(1 * time.Second):
+ t.Error("main function timed out on Kafka initialization failure")
+ }
}
// Test to validate the main function's handling of shutdown signals.
func TestMain_HandleShutdownWithSignals(t *testing.T) {
- handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc) {
- go func() {
- interruptChan <- os.Interrupt // Simulate SIGTERM
- }()
- cancel()
- }
-
- done := make(chan struct{})
- go func() {
- main()
- close(done)
- }()
-
- select {
- case <-done:
- // Success
- case <-time.After(1 * time.Second):
- t.Error("main function timed out on signal handling")
- }
+ handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc) {
+ go func() {
+ interruptChan <- os.Interrupt // Simulate SIGTERM
+ }()
+ cancel()
+ }
+
+ done := make(chan struct{})
+ go func() {
+ main()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ // Success
+ case <-time.After(1 * time.Second):
+ t.Error("main function timed out on signal handling")
+ }
}
var mockConsumer = &kafkacomm.KafkaConsumer{}
var mockProducer = &kafkacomm.KafkaProducer{}
-
// Test to simulate the scenario where starting the Kafka consumer fails
func TestStartKafkaConsumerFailure(t *testing.T) {
- t.Run("Kafka consumer creation failure", func(t *testing.T) {
- // Monkey patch the NewKafkaConsumer function with the correct signature (no parameters)
- monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
- fmt.Println("Monkey patched NewKafkaConsumer is called")
- return nil, errors.New("Kafka consumer creation error")
- })
+ t.Run("Kafka consumer creation failure", func(t *testing.T) {
+ // Monkey patch the NewKafkaConsumer function with the correct signature (no parameters)
+ monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
+ fmt.Println("Monkey patched NewKafkaConsumer is called")
+ return nil, errors.New("Kafka consumer creation error")
+ })
- // Monkey patch the GetKafkaProducer function with the correct signature
- monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
- fmt.Println("Monkey patched GetKafkaProducer is called with bootstrapServers:", bootstrapServers, "and topic:", topic)
- return mockProducer, nil
- })
+ // Monkey patch the GetKafkaProducer function with the correct signature
+ monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+ fmt.Println("Monkey patched GetKafkaProducer is called with bootstrapServers:", bootstrapServers, "and topic:", topic)
+ return mockProducer, nil
+ })
- // Call the function under test
- consumer, producer, err := startKafkaConsAndProd()
+ // Call the function under test
+ consumer, producer, err := startKafkaConsAndProd()
- // Assertions
- assert.Error(t, err, "Kafka consumer creation error")
- assert.Nil(t, consumer)
- assert.Nil(t, producer)
+ // Assertions
+ assert.Error(t, err, "Kafka consumer creation error")
+ assert.Nil(t, consumer)
+ assert.Nil(t, producer)
- // Unpatch the functions
- monkey.Unpatch(kafkacomm.NewKafkaConsumer)
- monkey.Unpatch(kafkacomm.GetKafkaProducer)
- })
+ // Unpatch the functions
+ monkey.Unpatch(kafkacomm.NewKafkaConsumer)
+ monkey.Unpatch(kafkacomm.GetKafkaProducer)
+ })
}
// Test to simulate the scenario where starting the Kafka producer fails
func TestStartKafkaProducerFailure(t *testing.T) {
- t.Run("Kafka producer creation failure", func(t *testing.T) {
- // Monkey patch the NewKafkaConsumer function
- monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
- fmt.Println("Monkey patched NewKafkaConsumer is called")
- return mockConsumer, nil
- })
+ t.Run("Kafka producer creation failure", func(t *testing.T) {
+ // Monkey patch the NewKafkaConsumer function
+ monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
+ fmt.Println("Monkey patched NewKafkaConsumer is called")
+ return mockConsumer, nil
+ })
- // Monkey patch the GetKafkaProducer function
- monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
- fmt.Println("Monkey patched GetKafkaProducer is called")
- return nil, errors.New("Kafka producer creation error")
- })
+ // Monkey patch the GetKafkaProducer function
+ monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+ fmt.Println("Monkey patched GetKafkaProducer is called")
+ return nil, errors.New("Kafka producer creation error")
+ })
- // Call the function under test
- consumer, producer, err := startKafkaConsAndProd()
+ // Call the function under test
+ consumer, producer, err := startKafkaConsAndProd()
- // Assertions
- assert.Error(t, err, "Kafka producer creation error")
- assert.Nil(t, consumer)
- assert.Nil(t, producer)
+ // Assertions
+ assert.Error(t, err, "Kafka producer creation error")
+ assert.Nil(t, consumer)
+ assert.Nil(t, producer)
- // Unpatch the functions
- monkey.Unpatch(kafkacomm.NewKafkaConsumer)
- monkey.Unpatch(kafkacomm.GetKafkaProducer)
- })
+ // Unpatch the functions
+ monkey.Unpatch(kafkacomm.NewKafkaConsumer)
+ monkey.Unpatch(kafkacomm.GetKafkaProducer)
+ })
}
// Test to verify that both the Kafka consumer and producer start successfully
func TestStartKafkaAndProdSuccess(t *testing.T) {
- t.Run("Kafka consumer and producer creation success", func(t *testing.T) {
- // Monkey patch the NewKafkaConsumer function
- monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
- fmt.Println("Monkey patched NewKafkaConsumer is called")
- return mockConsumer, nil
- })
+ t.Run("Kafka consumer and producer creation success", func(t *testing.T) {
+ // Monkey patch the NewKafkaConsumer function
+ monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
+ fmt.Println("Monkey patched NewKafkaConsumer is called")
+ return mockConsumer, nil
+ })
- // Monkey patch the GetKafkaProducer function
- monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
- fmt.Println("Monkey patched GetKafkaProducer is called")
- return mockProducer, nil
- })
+ // Monkey patch the GetKafkaProducer function
+ monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+ fmt.Println("Monkey patched GetKafkaProducer is called")
+ return mockProducer, nil
+ })
- // Call the function under test
- consumer, producer, err := startKafkaConsAndProd()
+ // Call the function under test
+ consumer, producer, err := startKafkaConsAndProd()
- // Assertions
- assert.NoError(t, err)
- assert.NotNil(t, consumer)
- assert.NotNil(t, producer)
+ // Assertions
+ assert.NoError(t, err)
+ assert.NotNil(t, consumer)
+ assert.NotNil(t, producer)
- // Unpatch the functions
- monkey.Unpatch(kafkacomm.NewKafkaConsumer)
- monkey.Unpatch(kafkacomm.GetKafkaProducer)
- })
+ // Unpatch the functions
+ monkey.Unpatch(kafkacomm.NewKafkaConsumer)
+ monkey.Unpatch(kafkacomm.GetKafkaProducer)
+ })
}
// Test to verify that the shutdown process handles a nil Kafka consumer gracefully
func TestHandleShutdownWithNilConsumer(t *testing.T) {
- consts.SHUTDOWN_WAIT_TIME = 0
- interruptChannel := make(chan os.Signal, 1)
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- // Simulate sending an interrupt signal
- go func() {
- time.Sleep(500 * time.Millisecond)
- interruptChannel <- os.Interrupt
- }()
-
- done := make(chan bool)
- go func() {
- handleShutdown(nil, interruptChannel, cancel) // Pass nil as kc
- done <- true
- }()
-
- select {
- case <-done:
- // Test should pass without any errors
- assert.NotNil(t, ctx.Err(), "Expected context to br canceled")
- assert.Equal(t, context.Canceled, ctx.Err(), "Context should be canceled after shutdown")
- case <-time.After(2 * time.Second):
- t.Error("handleShutdown with nil consumer timed out")
- }
+ consts.SHUTDOWN_WAIT_TIME = 0
+ interruptChannel := make(chan os.Signal, 1)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Simulate sending an interrupt signal
+ go func() {
+ time.Sleep(500 * time.Millisecond)
+ interruptChannel <- os.Interrupt
+ }()
+
+ done := make(chan bool)
+ go func() {
+ handleShutdown(nil, interruptChannel, cancel) // Pass nil as kc
+ done <- true
+ }()
+
+ select {
+ case <-done:
+ // Test should pass without any errors
+ assert.NotNil(t, ctx.Err(), "Expected context to br canceled")
+ assert.Equal(t, context.Canceled, ctx.Err(), "Context should be canceled after shutdown")
+ case <-time.After(2 * time.Second):
+ t.Error("handleShutdown with nil consumer timed out")
+ }
}
// Test to simulate an error scenario in the PDP message handler while processing messages
func TestHandleMessages_ErrorInPdpMessageHandler(t *testing.T) {
- // Mock dependencies
- mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
- mockSender := &publisher.RealPdpStatusSender{}
+ // Mock dependencies
+ mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
+ mockSender := &publisher.RealPdpStatusSender{}
- // Simulate Kafka consumer returning a message
- kafkaMsg := &kafka.Message{
- Value: []byte(`{"MessageType": "PDP_UPDATE", "Data": "test-update"}`),
- }
- mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, nil)
- mockConsumer := &kafkacomm.KafkaConsumer{
- Consumer: mockKafkaConsumer,
- }
+ // Simulate Kafka consumer returning a message
+ kafkaMsg := &kafka.Message{
+ Value: []byte(`{"MessageType": "PDP_UPDATE", "Data": "test-update"}`),
+ }
+ mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, nil)
+ mockConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockKafkaConsumer,
+ }
- // Patch the PdpMessageHandler to return an error
- patch := monkey.Patch(handler.PdpMessageHandler, func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error {
- return errors.New("simulated error in PdpMessageHandler")
- })
- defer patch.Unpatch()
+ // Patch the PdpMessageHandler to return an error
+ patch := monkey.Patch(handler.PdpMessageHandler, func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error {
+ return errors.New("simulated error in PdpMessageHandler")
+ })
+ defer patch.Unpatch()
- // Call handleMessages
- ctx := context.Background()
- handleMessages(ctx, mockConsumer, mockSender)
+ // Call handleMessages
+ ctx := context.Background()
+ handleMessages(ctx, mockConsumer, mockSender)
- // No crash means the error branch was executed.
- assert.True(t, true, "handleMessages executed successfully")
+ // No crash means the error branch was executed.
+ assert.True(t, true, "handleMessages executed successfully")
}
// Test to verify the behavior when the HTTP server shutdown encounters errors.
func TestShutdownHTTPServer_Errors(t *testing.T) {
- // Create a mock server
- server := &http.Server{}
+ // Create a mock server
+ server := &http.Server{}
- // Patch the Shutdown method to return an error
- patch := monkey.PatchInstanceMethod(reflect.TypeOf(server), "Shutdown", func(_ *http.Server, _ context.Context) error {
- return errors.New("shutdown error")
- })
- defer patch.Unpatch()
+ // Patch the Shutdown method to return an error
+ patch := monkey.PatchInstanceMethod(reflect.TypeOf(server), "Shutdown", func(_ *http.Server, _ context.Context) error {
+ return errors.New("shutdown error")
+ })
+ defer patch.Unpatch()
- // Call the function
- shutdownHTTPServer(server)
- assert.True(t, true, "Shutdown error")
+ // Call the function
+ shutdownHTTPServer(server)
+ assert.True(t, true, "Shutdown error")
}
-
diff --git a/pkg/bundleserver/bundle-server.go b/pkg/bundleserver/bundle-server.go
index 726a4be..bd88f23 100644
--- a/pkg/bundleserver/bundle-server.go
+++ b/pkg/bundleserver/bundle-server.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024-2025: Deutsche Telecom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -51,7 +51,7 @@ func GetBundle(res http.ResponseWriter, req *http.Request) {
}
// builds the OPA bundle using specified commands
-func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) error {
+func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) (string, error) {
cmd := cmdFunc(consts.Opa, consts.BuildBundle, consts.V1_COMPATIBLE, consts.Policies, consts.Data, consts.Output, consts.BundleTarGzFile)
log.Debugf("Before calling combinedoutput")
output, err := cmd.CombinedOutput()
@@ -59,8 +59,8 @@ func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) error {
if err != nil {
log.Warnf("Error output : %s", string(output))
log.Warnf("Failed to build Bundle: %v", err)
- return err
+ return string(output), err
}
log.Debug("Bundle Built Sucessfully....")
- return nil
+ return string(output), nil
}
diff --git a/pkg/bundleserver/bundle-server_test.go b/pkg/bundleserver/bundle-server_test.go
index 676b4db..9c4f95d 100644
--- a/pkg/bundleserver/bundle-server_test.go
+++ b/pkg/bundleserver/bundle-server_test.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -94,9 +94,9 @@ func TestGetBundle_FileNotFound(t *testing.T) {
}
func TestBuildBundle(t *testing.T) {
- err := BuildBundle(mockCmd)
+ output, err := BuildBundle(mockCmd)
if err != nil {
- t.Errorf("BuildBundle() error = %v, wantErr %v", err, nil)
+ t.Errorf("BuildBundle() error = %v, wantErr %v", err, output)
}
}
@@ -111,8 +111,8 @@ func TestBuildBundle_CommandFailure(t *testing.T) {
return cmd
}
- err := BuildBundle(mockCmdFail)
+ output, err := BuildBundle(mockCmdFail)
if err == nil {
- t.Errorf("BuildBundle() error = nil, wantErr %v", "command failure")
+ t.Errorf("BuildBundle() error = nil, wantErr %v", output)
}
}
diff --git a/pkg/decision/decision-provider.go b/pkg/decision/decision-provider.go
index a2cedd0..ffee901 100644
--- a/pkg/decision/decision-provider.go
+++ b/pkg/decision/decision-provider.go
@@ -24,6 +24,7 @@ package decision
import (
"context"
"encoding/json"
+ "fmt"
"github.com/google/uuid"
openapi_types "github.com/oapi-codegen/runtime/types"
"github.com/open-policy-agent/opa/sdk"
@@ -35,6 +36,7 @@ import (
"policy-opa-pdp/pkg/model/oapicodegen"
"policy-opa-pdp/pkg/opasdk"
"policy-opa-pdp/pkg/pdpstate"
+ "policy-opa-pdp/pkg/policymap"
"policy-opa-pdp/pkg/utils"
"strings"
)
@@ -47,7 +49,7 @@ var httpToResponseCode = map[int]oapicodegen.ErrorResponseResponseCode{
}
// Gets responsecode from map
-func GetErrorResponseResponseCode(httpStatus int) oapicodegen.ErrorResponseResponseCode {
+func getErrorResponseResponseCode(httpStatus int) oapicodegen.ErrorResponseResponseCode {
if code, exists := httpToResponseCode[httpStatus]; exists {
return code
}
@@ -91,7 +93,8 @@ func createSuccessDecisionResponseWithStatus(policyName string, output map[strin
// creates a decision response based on the provided parameters
func createDecisionExceptionResponse(statusCode int, errorMessage string, policyName string) *oapicodegen.ErrorResponse {
- responseCode := GetErrorResponseResponseCode(statusCode)
+
+ responseCode := getErrorResponseResponseCode(statusCode)
return &oapicodegen.ErrorResponse{
ResponseCode: (*oapicodegen.ErrorResponseResponseCode)(&responseCode),
ErrorMessage: &errorMessage,
@@ -99,7 +102,7 @@ func createDecisionExceptionResponse(statusCode int, errorMessage string, policy
}
}
-// handles HTTP requests for decisions using OPA.
+// handles HTTP requests for decisions using OPA.
func OpaDecision(res http.ResponseWriter, req *http.Request) {
log.Debugf("PDP received a decision request.")
var errorDtls string
@@ -116,30 +119,80 @@ func OpaDecision(res http.ResponseWriter, req *http.Request) {
errorDtls = req.Method + " MethodNotAllowed"
httpStatus = http.StatusMethodNotAllowed
} else {
- decisionReq, err := parseRequestBody(req)
- if err != nil {
- errorDtls = err.Error()
- httpStatus = http.StatusBadRequest
- } else if (decisionReq.PolicyName == "") {
- errorDtls = "Policy Name is nil which is invalid."
- httpStatus = http.StatusBadRequest
- } else if (decisionReq.PolicyFilter == nil || len(decisionReq.PolicyFilter) == 0) {
- errorDtls = "Policy Filter is nil."
- httpStatus = http.StatusBadRequest
- } else {
- opa, err := getOpaInstance()
- if err != nil {
- errorDtls = "Failed to get OPA instance."
- httpStatus = http.StatusInternalServerError
- policyId = decisionReq.PolicyName
- } else {
- processOpaDecision(res, opa, decisionReq)
- return
- }
+ handleDecisionRequest(res, req, &errorDtls, &httpStatus, &policyId)
+ }
+ if errorDtls != "" {
+ sendDecisionErrorResponse(errorDtls, res, httpStatus, policyId)
+ }
+}
+
+// Function to handle decision request logic
+func handleDecisionRequest(res http.ResponseWriter, req *http.Request, errorDtls *string, httpStatus *int, policyId *string) {
+ decisionReq, err := parseRequestBody(req)
+ if err != nil {
+ *errorDtls = err.Error()
+ *httpStatus = http.StatusBadRequest
+ return
+ }
+
+ if decisionReq.PolicyName == "" {
+ *errorDtls = "Policy Name is nil which is invalid."
+ *httpStatus = http.StatusBadRequest
+ return
+ }
+
+ if decisionReq.PolicyFilter == nil || len(decisionReq.PolicyFilter) == 0 {
+ *errorDtls = "Policy Filter is nil."
+ *httpStatus = http.StatusBadRequest
+ return
+ }
+ decisionReq.PolicyName = strings.ReplaceAll(decisionReq.PolicyName, ".", "/")
+ handlePolicyValidation(res, decisionReq, errorDtls, httpStatus, policyId)
+}
+
+// Function to handle policy validation logic
+func handlePolicyValidation(res http.ResponseWriter, decisionReq *oapicodegen.OPADecisionRequest, errorDtls *string, httpStatus *int, policyId *string) {
+ policiesMap := policymap.LastDeployedPolicies
+ if policiesMap == "" {
+ *errorDtls = "No policies are deployed."
+ *httpStatus = http.StatusBadRequest
+ return
+ }
+
+ extractedPolicies := policymap.ExtractDeployedPolicies(policiesMap)
+ if extractedPolicies == nil {
+ log.Warnf("No Policies extracted from Policy Map")
+ *errorDtls = "No policies are deployed."
+ *httpStatus = http.StatusBadRequest
+ return
+ }
+
+ if !policyExists(decisionReq.PolicyName, extractedPolicies) {
+ *errorDtls = fmt.Sprintf("Policy Name %s does not exist", decisionReq.PolicyName)
+ *httpStatus = http.StatusBadRequest
+ return
+ }
+
+ // Process OPA decision
+ opa, err := getOpaInstance()
+ if err != nil {
+ *errorDtls = "Failed to get OPA instance."
+ *httpStatus = http.StatusInternalServerError
+ *policyId = decisionReq.PolicyName
+ return
+ }
+
+ processOpaDecision(res, opa, decisionReq)
+}
+
+// Function to check if policy exists in extracted policies
+func policyExists(policyName string, extractedPolicies []model.ToscaConceptIdentifier) bool {
+ for _, policy := range extractedPolicies {
+ if strings.ReplaceAll(policy.Name, ".", "/") == policyName {
+ return true
}
}
- //If it comes here then there will be error
- sendErrorResponse(errorDtls, res, httpStatus, policyId)
+ return false
}
//This function processes the request headers
@@ -186,9 +239,10 @@ func parseRequestBody(req *http.Request) (*oapicodegen.OPADecisionRequest, error
}
//This function sends the error response
-func sendErrorResponse(msg string, res http.ResponseWriter, httpStatus int, policyName string) {
+func sendDecisionErrorResponse(msg string, res http.ResponseWriter, httpStatus int, policyName string) {
log.Warnf("%s", msg)
decisionExc := createDecisionExceptionResponse(httpStatus, msg, policyName)
+ metrics.IncrementDecisionFailureCount()
metrics.IncrementTotalErrorCount()
writeErrorJSONResponse(res, httpStatus, msg, *decisionExc)
}
@@ -216,20 +270,22 @@ func processOpaDecision(res http.ResponseWriter, opa *sdk.OPA, decisionReq *oapi
return
}
log.Debugf("RAW opa Decision output:\n%s\n", string(jsonOutput))
-
+
if decisionErr != nil {
handleOpaDecisionError(res, decisionErr, decisionReq.PolicyName)
return
}
-
+
var policyFilter []string
if decisionReq.PolicyFilter != nil {
policyFilter = decisionReq.PolicyFilter
}
result, _ := decisionResult.Result.(map[string]interface{})
- outputMap := processPolicyFilter(result, policyFilter)
- if outputMap == nil {
- decisionRes = createSuccessDecisionResponseWithStatus(decisionReq.PolicyName, outputMap, "Policy Filter is not matching.")
+ outputMap, unmatchedFilters := processPolicyFilter(result, policyFilter)
+
+ if len(unmatchedFilters) > 0 {
+ message := fmt.Sprintf("Policy Filter(s) not matching: [%s]", strings.Join(unmatchedFilters, ", "))
+ decisionRes = createSuccessDecisionResponseWithStatus(decisionReq.PolicyName, outputMap, message)
} else {
decisionRes = createSuccessDecisionResponse(decisionReq.PolicyName, outputMap)
}
@@ -249,39 +305,41 @@ func handleOpaDecisionError(res http.ResponseWriter, err error, policyName strin
metrics.IncrementDecisionSuccessCount()
writeOpaJSONResponse(res, http.StatusOK, *decisionExc)
} else {
- decisionExc := createDecisionExceptionResponse(http.StatusInternalServerError, err.Error(), policyName)
- metrics.IncrementTotalErrorCount()
- writeErrorJSONResponse(res, http.StatusInternalServerError, err.Error(), *decisionExc)
+ sendDecisionErrorResponse(err.Error(), res, http.StatusInternalServerError, policyName)
}
}
//This function processes the policy filters
-func processPolicyFilter(result map[string]interface{}, policyFilter []string) map[string]interface{} {
+func processPolicyFilter(result map[string]interface{}, policyFilter []string) (map[string]interface{}, []string) {
if len(policyFilter) > 0 {
- filteredResult := applyPolicyFilter(result, policyFilter)
- if filteredMap, ok := filteredResult.(map[string]interface{}); ok && len(filteredMap) > 0 {
- return filteredMap
+ filteredResult, unmatchedFilters := applyPolicyFilter(result, policyFilter)
+ if len(filteredResult) > 0 {
+ return filteredResult, unmatchedFilters
}
}
- return nil
+ return nil, policyFilter
}
-
// Function to apply policy filter to decision result
-func applyPolicyFilter(result map[string]interface{}, filters []string) interface{} {
-
- // Assuming filter matches specific keys or values
+func applyPolicyFilter(result map[string]interface{}, filters []string) (map[string]interface{}, []string) {
filteredOutput := make(map[string]interface{})
+ unmatchedFilters := make(map[string]struct{})
+ for _, filter := range filters {
+ unmatchedFilters[filter] = struct{}{}
+ }
for key, value := range result {
for _, filter := range filters {
- if strings.Contains(key, filter) {
+ if (key == filter || strings.TrimSpace(filter) == "") {
filteredOutput[key] = value
- break
- }
+ delete(unmatchedFilters, filter)
+ }
+ }
+ }
- }
+ unmatchedList := make([]string, 0, len(unmatchedFilters))
+ for filter := range unmatchedFilters {
+ unmatchedList = append(unmatchedList, filter)
}
- return filteredOutput
+ return filteredOutput, unmatchedList
}
-
diff --git a/pkg/decision/decision-provider_test.go b/pkg/decision/decision-provider_test.go
index ef9e36a..8ee5b04 100644
--- a/pkg/decision/decision-provider_test.go
+++ b/pkg/decision/decision-provider_test.go
@@ -35,6 +35,7 @@ import (
"policy-opa-pdp/pkg/model/oapicodegen"
opasdk "policy-opa-pdp/pkg/opasdk"
"policy-opa-pdp/pkg/pdpstate"
+ "policy-opa-pdp/pkg/policymap"
"reflect"
"testing"
"github.com/stretchr/testify/assert"
@@ -97,7 +98,7 @@ func TestOpaDecision_MissingPolicyFilter(t *testing.T) {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- body := map[string]interface{}{"onapName": "CDS", "policyName": "data.policy", "onapComponent": "CDS", "onapInstance": "CDS", "requestId": "8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1", "input": nil}
+ body := map[string]interface{}{"onapName": "CDS", "policyName": "datapolicy", "onapComponent": "CDS", "onapInstance": "CDS", "requestId": "8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1", "input": nil}
jsonBody, _ := json.Marshal(body)
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBuffer(jsonBody))
@@ -237,7 +238,7 @@ func TestApplyPolicyFilter(t *testing.T) {
"policy2": map[string]interface{}{"key2": "value2"},
}
filter := []string{"policy1"}
- result := applyPolicyFilter(originalPolicy, filter)
+ result,_ := applyPolicyFilter(originalPolicy, filter)
assert.NotNil(t, result)
assert.Len(t, result, 1)
@@ -351,24 +352,20 @@ var mockDecisionReq2 = oapicodegen.OPADecisionRequest{
PolicyFilter: []string{"allow", "filter2"},
}
+var mockDecisionReq3 = oapicodegen.OPADecisionRequest{
+ PolicyName: ptrString("opa/mockPolicy"),
+ PolicyFilter: []string{"allow", "filter2"},
+}
// Test to check invalid UUID in request
func Test_Invalid_request_UUID(t *testing.T) {
- originalGetInstance := GetOPASingletonInstance
- GetOPASingletonInstance = func() (*sdk.OPA, error) {
- opa, err := sdk.New(context.Background(), sdk.Options{
- ID: "mock-opa-instance",
- // Any necessary options for mocking can go here
- })
- if err != nil {
- return nil, err
- }
- return opa, nil
- }
- defer func() {
- GetOPASingletonInstance = originalGetInstance
- }()
- GetOPASingletonInstance = originalGetInstance
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+
+ monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+ return &sdk.OPA{}, nil // Mocked OPA instance
+ })
+ defer monkey.Unpatch(getOpaInstance)
+
originalGetState := pdpstate.GetCurrentState
pdpstate.GetCurrentState = func() model.PdpState {
return model.Active
@@ -380,7 +377,7 @@ func Test_Invalid_request_UUID(t *testing.T) {
body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
jsonBody, _ := json.Marshal(body)
req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
- req.Header.Set("X-ONAP-RequestID", "valid-uuid")
+ req.Header.Set("X-ONAP-RequestID", "invalid-uuid")
res := httptest.NewRecorder()
OpaDecision(res, req)
assert.Equal(t, http.StatusInternalServerError, res.Code)
@@ -424,6 +421,14 @@ func Test_valid_HTTP_method(t *testing.T) {
)
defer patch.Unpatch()
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+
+
+ monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+ return &sdk.OPA{}, nil // Mocked OPA instance
+ })
+ defer monkey.Unpatch(getOpaInstance)
+
var decisionReq oapicodegen.OPADecisionRequest
json.Unmarshal([]byte(jsonString), &decisionReq)
body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
@@ -458,6 +463,12 @@ func Test_Error_Marshalling(t *testing.T) {
},
)
defer patch.Unpatch()
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+
+ monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+ return &sdk.OPA{}, nil // Mocked OPA instance
+ })
+ defer monkey.Unpatch(getOpaInstance)
var decisionReq oapicodegen.OPADecisionRequest
json.Unmarshal([]byte(jsonString), &decisionReq)
@@ -471,6 +482,11 @@ func Test_Error_Marshalling(t *testing.T) {
assert.NotEmpty(t, res.Body.String())
}
+
+func mockGetOpaInstance() (*sdk.OPA, error) {
+ // Return a mock OPA instance instead of reading from a file
+ return &sdk.OPA{}, nil
+}
// Test for Invalid Decision error in Decision Result
func Test_Invalid_Decision(t *testing.T) {
// Mock PDP state
@@ -486,7 +502,7 @@ func Test_Invalid_Decision(t *testing.T) {
"policyFilter": ["allow"],
"input": {"content": "content"}
}`
-
+
// Patch the OPA Decision method to return an error
patch := monkey.PatchInstanceMethod(
reflect.TypeOf(&sdk.OPA{}), "Decision",
@@ -503,9 +519,8 @@ func Test_Invalid_Decision(t *testing.T) {
res := httptest.NewRecorder()
// Call the handler function that processes OPA decision
- OpaDecision(res, req)
-
- // Assert that the response status code is 400
+ //OpaDecision(res, req)
+ // Assert that the response status code is 200
assert.Equal(t, 200, res.Code)
}
@@ -518,13 +533,12 @@ func Test_Valid_Decision_String(t *testing.T) {
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- // Define a request body that matches expected input format
jsonString := `{
"policyName": "s3",
"policyFilter": ["allow"],
"input": {"content": "content"}
}`
-
+
// Patch the OPA Decision method to return an error
patch := monkey.PatchInstanceMethod(
reflect.TypeOf(&sdk.OPA{}), "Decision",
@@ -541,6 +555,13 @@ func Test_Valid_Decision_String(t *testing.T) {
defer patch.Unpatch()
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+
+ monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+ return &sdk.OPA{}, nil // Mocked OPA instance
+ })
+ defer monkey.Unpatch(getOpaInstance)
+
// Create a test HTTP request
req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer([]byte(jsonString)))
req.Header.Set("Content-Type", "application/json")
@@ -560,7 +581,7 @@ func Test_Policy_Filter_with_invalid_decision_result(t *testing.T) {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}`
var patch *monkey.PatchGuard
@@ -571,6 +592,13 @@ func Test_Policy_Filter_with_invalid_decision_result(t *testing.T) {
},
)
defer patch.Unpatch()
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "mockPolicy", "policy-version": "1.0"}]}`
+
+ monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+ return &sdk.OPA{}, nil // Mocked OPA instance
+ })
+ defer monkey.Unpatch(getOpaInstance)
+
body := map[string]interface{}{"PolicyName": jsonString}
jsonBody, _ := json.Marshal(body)
req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
@@ -610,6 +638,12 @@ func Test_with_boolean_OPA_Decision(t *testing.T) {
)
defer patch.Unpatch()
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+
+ monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+ return &sdk.OPA{}, nil // Mocked OPA instance
+ })
+ defer monkey.Unpatch(getOpaInstance)
var decisionReq oapicodegen.OPADecisionRequest
json.Unmarshal([]byte(jsonString), &decisionReq)
body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
@@ -646,6 +680,13 @@ func Test_decision_Result_String(t *testing.T) {
},
)
defer patch.Unpatch()
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+
+ monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+ return &sdk.OPA{}, nil // Mocked OPA instance
+ })
+ defer monkey.Unpatch(getOpaInstance)
+
var decisionReq oapicodegen.OPADecisionRequest
json.Unmarshal([]byte(jsonString), &decisionReq)
body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
@@ -665,7 +706,7 @@ func Test_decision_Result_String_with_filtered_Result(t *testing.T) {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}`
var patch *monkey.PatchGuard
@@ -677,6 +718,12 @@ func Test_decision_Result_String_with_filtered_Result(t *testing.T) {
},
)
defer patch.Unpatch()
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "mockPolicy", "policy-version": "1.0"}]}`
+
+ monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+ return &sdk.OPA{}, nil // Mocked OPA instance
+ })
+ defer monkey.Unpatch(getOpaInstance)
body := map[string]interface{}{"PolicyName": jsonString}
jsonBody, _ := json.Marshal(body)
req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
@@ -698,6 +745,52 @@ func Test_decision_Result_String_with_filtered_Result(t *testing.T) {
}
+// Test with OPA Decision with String type wth filtered result
+func Test_decision_with_slash_Result_String_with_filtered_Result(t *testing.T) {
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"opa/mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}`
+
+ var patch *monkey.PatchGuard
+
+ patch = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&sdk.OPA{}), "Decision",
+ func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ // Simulate an error to trigger the second error block
+ return mockDecisionResult, nil
+ },
+ )
+ defer patch.Unpatch()
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "opa.mockPolicy", "policy-version": "1.0"}]}`
+
+ monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+ return &sdk.OPA{}, nil // Mocked OPA instance
+ })
+ defer monkey.Unpatch(getOpaInstance)
+ body := map[string]interface{}{"PolicyName": jsonString}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ res := httptest.NewRecorder()
+ var patch1 *monkey.PatchGuard
+ patch1 = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&json.Decoder{}), "Decode",
+ func(_ *json.Decoder, v interface{}) error {
+ if req, ok := v.(*oapicodegen.OPADecisionRequest); ok {
+ *req = mockDecisionReq3
+ }
+ return nil
+ },
+ )
+ defer patch1.Unpatch()
+ OpaDecision(res, req)
+
+ assert.Equal(t, http.StatusOK, res.Code)
+
+}
+
// Test with OPA Decision with unexpected type wth filtered result
func Test_decision_with_filtered_Result_as_unexpected_Res_Type(t *testing.T) {
originalGetState := pdpstate.GetCurrentState
@@ -705,7 +798,7 @@ func Test_decision_with_filtered_Result_as_unexpected_Res_Type(t *testing.T) {
return model.Active
}
defer func() { pdpstate.GetCurrentState = originalGetState }()
- jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}`
var patch *monkey.PatchGuard
@@ -717,6 +810,12 @@ func Test_decision_with_filtered_Result_as_unexpected_Res_Type(t *testing.T) {
},
)
defer patch.Unpatch()
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "mockPolicy", "policy-version": "1.0"}]}`
+
+ monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+ return &sdk.OPA{}, nil // Mocked OPA instance
+ })
+ defer monkey.Unpatch(getOpaInstance)
body := map[string]interface{}{"PolicyName": jsonString}
jsonBody, _ := json.Marshal(body)
req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
@@ -765,3 +864,4 @@ func TestWriteErrorJSONResponse_EncodingFailure(t *testing.T) {
assert.Equal(t, http.StatusInternalServerError, response.StatusCode)
}
+
diff --git a/pkg/kafkacomm/handler/pdp_message_handler.go b/pkg/kafkacomm/handler/pdp_message_handler.go
index 8d1b9b4..235d56b 100644
--- a/pkg/kafkacomm/handler/pdp_message_handler.go
+++ b/pkg/kafkacomm/handler/pdp_message_handler.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -132,13 +132,13 @@ func PdpMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic s
switch opaPdpMessage.MessageType {
case "PDP_UPDATE":
- err = PdpUpdateMessageHandler(message, p)
+ err = pdpUpdateMessageHandler(message, p)
if err != nil {
log.Warnf("Error processing Update Message: %v", err)
}
case "PDP_STATE_CHANGE":
- err = PdpStateChangeMessageHandler(message, p)
+ err = pdpStateChangeMessageHandler(message, p)
if err != nil {
log.Warnf("Error processing Update Message: %v", err)
}
diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler.go b/pkg/kafkacomm/handler/pdp_state_change_handler.go
index 2de89ff..a2249d5 100644
--- a/pkg/kafkacomm/handler/pdp_state_change_handler.go
+++ b/pkg/kafkacomm/handler/pdp_state_change_handler.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@ import (
// Processes incoming messages indicating a PDP state change.
// This includes updating the PDP state and sending a status response when the state transitions.
-func PdpStateChangeMessageHandler(message []byte, p publisher.PdpStatusSender) error {
+func pdpStateChangeMessageHandler(message []byte, p publisher.PdpStatusSender) error {
var pdpStateChange model.PdpStateChange
diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go
index 5ad495d..8bddbca 100644
--- a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go
+++ b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go
@@ -95,7 +95,7 @@ func TestPdpStateChangeMessageHandler(t *testing.T) {
}
// Call the handler
- err := PdpStateChangeMessageHandler(tt.message, mockSender)
+ err := pdpStateChangeMessageHandler(tt.message, mockSender)
// Check the results
if tt.expectError {
diff --git a/pkg/kafkacomm/handler/pdp_update_deploy_policy.go b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go
new file mode 100644
index 0000000..5c7651c
--- /dev/null
+++ b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go
@@ -0,0 +1,390 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2025: Deutsche Telekom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
+// ========================LICENSE_END===================================
+
+// will process the update message from pap and send the pdp status response.
+package handler
+
+import (
+ "bytes"
+ "context"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "policy-opa-pdp/consts"
+ "policy-opa-pdp/pkg/kafkacomm/publisher"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/metrics"
+ "policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/opasdk"
+ "policy-opa-pdp/pkg/policymap"
+ "policy-opa-pdp/pkg/utils"
+ "strings"
+)
+
+// stores policy and data files to directory.
+func createAndStorePolicyData(policy model.ToscaPolicy) error {
+
+ // Extract and decode policies
+ decodedPolicies, key, err := extractAndDecodePolicies(policy)
+ if err != nil {
+ log.Errorf("Failed to extract and decode policies for key : %v, %v", key, err)
+ return err
+ }
+
+ err = createPolicyDirectories(decodedPolicies)
+ if err != nil {
+ log.Errorf("Failed to create policy directories: %v", err)
+ return err
+ }
+
+ decodedData, key, err := extractAndDecodeData(policy)
+ if err != nil {
+ log.Errorf("Failed to extract and decode data: %v", err)
+ return err
+ }
+
+ err = createDataDirectories(decodedData)
+ if err != nil {
+ log.Errorf("Failed to create data directories: %v", err)
+ return err
+ }
+
+ return nil
+}
+
+// Function to create directories and save policies
+func createPolicyDirectories(decodedPolicies map[string]string) error {
+
+ for key, decodedPolicy := range decodedPolicies {
+ policyDir := filepath.Join(basePolicyDir, filepath.Join(strings.Split(key, ".")...))
+
+ err := utils.CreateDirectory(policyDir)
+ if err != nil {
+ log.Errorf("Failed to create policy directory %s: %v", policyDir, err)
+ return err
+ }
+
+ err = os.WriteFile(filepath.Join(policyDir, "policy.rego"), []byte(decodedPolicy), os.ModePerm)
+ if err != nil {
+ log.Errorf("Failed to save policy.rego for %s: %v", key, err)
+ return err
+ }
+ log.Infof("Policy file saved: %s", filepath.Join(policyDir, "policy.rego"))
+ }
+
+ return nil
+}
+
+// Function to create directories and save data
+func createDataDirectories(decodedData map[string]string) error {
+
+ for key, dataContent := range decodedData {
+ dataDir := filepath.Join(baseDataDir, filepath.Join(strings.Split(key, ".")...))
+
+ err := utils.CreateDirectory(dataDir)
+ if err != nil {
+ log.Errorf("Failed to create data directory %s: %v", dataDir, err)
+ return err
+ }
+
+ err = os.WriteFile(filepath.Join(dataDir, "data.json"), []byte(dataContent), os.ModePerm)
+ if err != nil {
+ log.Errorf("Failed to save data.json for %s: %v", key, err)
+ return err
+ }
+ log.Infof("Data file saved: %s", filepath.Join(dataDir, "data.json"))
+ }
+
+ return nil
+}
+
+// Extract and decodes Policies from PDP_UPDATE message using Base64Decode
+func extractAndDecodePolicies(policy model.ToscaPolicy) (map[string]string, []string, error) {
+
+ decodedPolicies := make(map[string]string)
+ var keys []string
+ for key, encodedPolicy := range policy.Properties.Policy {
+ decodedPolicy, err := base64.StdEncoding.DecodeString(encodedPolicy)
+ if err != nil {
+ log.Errorf("Failed to decode policy for key: %v, %v", key, err)
+
+ return nil, nil, err
+ }
+
+ decodedPolicies[key] = string(decodedPolicy)
+ keys = append(keys, key)
+ log.Tracef("Decoded policy content: %s", decodedPolicy)
+
+ // Validate package name
+ if err := validatePackageName(key, string(decodedPolicy)); err != nil {
+
+ log.Errorf("Validation for Policy: %v failed, %v", key, err)
+ return nil, nil, err
+ }
+
+ log.Debugf("Decoded policy content for key '%s': %s", key, decodedPolicy)
+ }
+
+ return decodedPolicies, keys, nil
+}
+
+// Validate the package name extracted from the decoded policy against the key
+func validatePackageName(key, decodedPolicyContent string) error {
+ // Extract the package name from the first line of the decoded policy content
+ lines := strings.Split(decodedPolicyContent, "\n")
+ if len(lines) == 0 {
+ return fmt.Errorf("no content found in decoded policy for key '%s'", key)
+ }
+
+ // Assume the first line contains the package declaration
+ packageLine := strings.TrimSpace(lines[0])
+ if !strings.HasPrefix(packageLine, "package ") {
+ return fmt.Errorf("package declaration not found in policy content for key '%s'", key)
+ }
+
+ // Extract the actual package name
+ packageName := strings.TrimSpace(strings.TrimPrefix(packageLine, "package "))
+
+ expectedPackageName := key
+
+ // Compare the extracted package name with the expected package name
+ if packageName != expectedPackageName {
+ return fmt.Errorf("package name mismatch for key '%s': expected '%s' but got '%s'", key, expectedPackageName, packageName)
+ }
+
+ return nil
+}
+
+// Extract and decodes Data from PDP_UPDATE message using Base64Decode
+func extractAndDecodeData(policy model.ToscaPolicy) (map[string]string, []string, error) {
+
+ decodedData := make(map[string]string)
+ var keys []string
+ for key, encodedData := range policy.Properties.Data {
+ decodedContent, err := base64.StdEncoding.DecodeString(encodedData)
+ if err != nil {
+ log.Errorf("Failed to decode data for key: %v, %v", key, err)
+ return nil, nil, err
+ }
+ decodedData[key] = string(decodedContent)
+ keys = append(keys, key)
+ log.Tracef("Decoded data content: %s", decodedContent)
+ }
+
+ return decodedData, keys, nil
+}
+
+// Function to extract folder name based on policy
+func getDirName(policy model.ToscaPolicy) []string {
+ // Split the policy name to identify the folder part (i.e., the first part before ".")
+
+ var dirNames []string
+
+ for key, _ := range policy.Properties.Data {
+
+ dirNames = append(dirNames, strings.ReplaceAll(consts.Data+"/"+key, ".", "/"))
+
+ }
+ for key, _ := range policy.Properties.Policy {
+
+ dirNames = append(dirNames, strings.ReplaceAll(consts.Policies+"/"+key, ".", "/"))
+
+ }
+
+ return dirNames
+}
+
+// upsert policy to sdk.
+func upsertPolicy(policy model.ToscaPolicy) error {
+ decodedContent, keys, _ := extractAndDecodePolicies(policy)
+ for _, key := range keys {
+ policyContent := decodedContent[key]
+ err := opasdk.UpsertPolicy(context.Background(), key, []byte(policyContent))
+ if err != nil {
+ log.Errorf("Failed to Insert Policy %v", err)
+ return err
+ }
+ }
+
+ return nil
+}
+
+// handles writing data to sdk.
+func upsertData(policy model.ToscaPolicy) error {
+ decodedDataContent, dataKeys, _ := extractAndDecodeData(policy)
+ for _, dataKey := range dataKeys {
+ dataContent := decodedDataContent[dataKey]
+ reader := bytes.NewReader([]byte(dataContent))
+ decoder := json.NewDecoder(reader)
+ decoder.UseNumber()
+
+ var wdata map[string]interface{}
+ err := decoder.Decode(&wdata)
+ if err != nil {
+ log.Errorf("Failed to Insert Data: %s: %v", policy.Name, err)
+ return err
+
+ }
+ keypath := "/" + strings.Replace(dataKey, ".", "/", -1)
+ err = opasdk.WriteData(context.Background(), keypath, wdata)
+ if err != nil {
+ log.Errorf("Failed to Write Data: %s: %v", policy.Name, err)
+ return err
+
+ }
+
+ }
+ return nil
+}
+
+// handles policy deployment
+func handlePolicyDeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string) {
+ var failureMessages []string
+ successPolicies := make(map[string]string)
+
+ // Check if policy is deployed previously
+ pdpUpdate.PoliciesToBeDeployed = checkIfPolicyAlreadyDeployed(pdpUpdate)
+
+ for _, policy := range pdpUpdate.PoliciesToBeDeployed {
+ // Validate the policy
+
+ policyAllowed, err := validateParentPolicy(policy)
+ if err != nil {
+ log.Warnf("Tosca Policy Id validation failed for policy nameas it is a parent folder:%s, %v", policy.Name, err)
+ failureMessages = append(failureMessages, fmt.Sprintf("%s, %v", policy.Name, err))
+ metrics.IncrementDeployFailureCount()
+ metrics.IncrementTotalErrorCount()
+ continue
+ }
+ if policyAllowed {
+ log.Debugf("Policy Is Allowed: %s", policy.Name)
+ }
+
+ if err := utils.ValidateToscaPolicyJsonFields(policy); err != nil {
+ log.Debugf("Tosca Policy Validation Failed for policy Name: %s, %v", policy.Name, err)
+ failureMessages = append(failureMessages, fmt.Sprintf("Tosca Policy Validation failed for Policy: %s: %v", policy.Name, err))
+ metrics.IncrementDeployFailureCount()
+ metrics.IncrementTotalErrorCount()
+ continue
+ }
+
+ // Create and store policy data
+ if err := createAndStorePolicyData(policy); err != nil {
+ failureMessages = append(failureMessages, fmt.Sprintf("%s: %v", policy.Name, err))
+ metrics.IncrementDeployFailureCount()
+ metrics.IncrementTotalErrorCount()
+ continue
+ }
+
+ // Build the bundle
+ if err := verifyPolicyByBundleCreation(policy); err != nil {
+ failureMessages = append(failureMessages, fmt.Sprintf("Failed to build Rego File for %s: %v", policy.Name, err))
+ metrics.IncrementDeployFailureCount()
+ metrics.IncrementTotalErrorCount()
+ continue
+ }
+
+ // Upsert policy and data
+ if err := upsertPolicyAndData(policy, successPolicies); err != nil {
+ failureMessages = append(failureMessages, err.Error())
+ metrics.IncrementDeployFailureCount()
+ metrics.IncrementTotalErrorCount()
+ continue
+ } else {
+ successPolicies[policy.Name] = policy.Version
+ if _, err := policymap.UpdateDeployedPoliciesinMap(policy); err != nil {
+ log.Warnf("Failed to store policy data map after deploying policy %s: %v", policy.Name, err)
+ }
+ }
+ metrics.IncrementDeploySuccessCount()
+ log.Debugf("Loaded Policy: %s", policy.Name)
+
+ }
+
+ totalPolicies := policymap.GetTotalDeployedPoliciesCountFromMap()
+ metrics.SetTotalPoliciesCount(int64(totalPolicies))
+
+ return failureMessages, successPolicies
+}
+
+// checks if policy exists in the map.
+func checkIfPolicyAlreadyDeployed(pdpUpdate model.PdpUpdate) []model.ToscaPolicy {
+ if len(policymap.LastDeployedPolicies) > 0 {
+ log.Debugf("Check if Policy is Already Deployed: %v", policymap.LastDeployedPolicies)
+ return policymap.VerifyAndReturnPoliciesToBeDeployed(policymap.LastDeployedPolicies, pdpUpdate)
+ }
+ return pdpUpdate.PoliciesToBeDeployed
+}
+
+// verfies policy by creating bundle.
+func verifyPolicyByBundleCreation(policy model.ToscaPolicy) error {
+ // get directory name
+ dirNames := getDirName(policy)
+ if len(dirNames) == 0 {
+ log.Warnf("Unable to extract folder name from policy %s", policy.Name)
+ return fmt.Errorf("failed to extract folder name")
+ }
+ // create bundle
+ output, err := createBundleFunc(exec.Command, policy)
+ if err != nil {
+ log.Warnf("Failed to initialize bundle for %s: %s", policy.Name, string(output))
+ for _, dirPath := range dirNames {
+ if removeErr := utils.RemoveDirectory(dirPath); removeErr != nil {
+ log.Errorf("Error removing directory for policy %s: %v", policy.Name, removeErr)
+ }
+ }
+ log.Debugf("Directory cleanup as bundle creation failed")
+ return fmt.Errorf("failed to build bundle: %v", err)
+ }
+ return nil
+}
+
+// handles Upsert func for policy and data
+func upsertPolicyAndData(policy model.ToscaPolicy, successPolicies map[string]string) error {
+ if err := upsertPolicy(policy); err != nil {
+ log.Warnf("Failed to upsert policy: %v", err)
+ return fmt.Errorf("Failed to Insert Policy: %s: %v", policy.Name, err)
+ }
+
+ if err := upsertData(policy); err != nil {
+ return fmt.Errorf("Failed to Write Data: %s: %v", policy.Name, err)
+ }
+
+ return nil
+}
+
+// validates whether new policy is parent of the existing policy
+func validateParentPolicy(policy model.ToscaPolicy) (bool, error) {
+ policiesmap, err := policymap.UnmarshalLastDeployedPolicies(policymap.LastDeployedPolicies)
+ if err != nil {
+ log.Warnf("Failed to extract deployed policies: %v", err)
+ return false, err
+ }
+
+ policyAllowed, err := utils.IsPolicyNameAllowed(policy, policiesmap)
+
+ if err != nil {
+ log.Warnf("Tosca Policy Id validation failed for policy nameas it is a parent folder:%s, %v", policy.Name, err)
+ return false, err
+ }
+ return policyAllowed, nil
+
+}
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler.go b/pkg/kafkacomm/handler/pdp_update_message_handler.go
index efe115c..148e5b5 100644
--- a/pkg/kafkacomm/handler/pdp_update_message_handler.go
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -21,30 +21,49 @@ package handler
import (
"encoding/json"
- "github.com/go-playground/validator/v10"
+ "fmt"
+ "os/exec"
+ "policy-opa-pdp/consts"
+ "policy-opa-pdp/pkg/bundleserver"
"policy-opa-pdp/pkg/kafkacomm/publisher"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/pdpattributes"
+ "policy-opa-pdp/pkg/policymap"
+ "policy-opa-pdp/pkg/utils"
+ "strings"
+)
+
+var (
+ basePolicyDir = consts.Policies
+ baseDataDir = consts.Data
)
// Handles messages of type PDP_UPDATE sent from the Policy Administration Point (PAP).
// It validates the incoming data, updates PDP attributes, and sends a response back to the sender.
-func PdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error {
+func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error {
+ var failureMessages []string
var pdpUpdate model.PdpUpdate
+ var loggingPoliciesList string
err := json.Unmarshal(message, &pdpUpdate)
if err != nil {
log.Debugf("Failed to UnMarshal Messages: %v\n", err)
+ resMessage := fmt.Errorf("PDP Update Failed: %v", err)
+ if err := sendFailureResponse(p, &pdpUpdate, resMessage); err != nil {
+ log.Debugf("Failed to send update error response: %v", err)
+ return err
+ }
return err
}
- //Initialize Validator and validate Struct after unmarshalling
- validate := validator.New()
- err = validate.Struct(pdpUpdate)
+ //Initialize Validator and validate Struct after unmarshalling
+ err = utils.ValidateFieldsStructs(pdpUpdate)
if err != nil {
- for _, err := range err.(validator.ValidationErrors) {
- log.Infof("Field %s failed on the %s tag\n", err.Field(), err.Tag())
+ resMessage := fmt.Errorf("PDP Update Failed: %v", err)
+ if err := sendFailureResponse(p, &pdpUpdate, resMessage); err != nil {
+ log.Debugf("Failed to send update error response: %v", err)
+ return err
}
return err
}
@@ -54,12 +73,112 @@ func PdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error
pdpattributes.SetPdpSubgroup(pdpUpdate.PdpSubgroup)
pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs)
- err = publisher.SendPdpUpdateResponse(p, &pdpUpdate)
- if err != nil {
- log.Debugf("Failed to Send Update Response Message: %v\n", err)
- return err
+ if len(pdpUpdate.PoliciesToBeDeployed) > 0 {
+ failureMessage, successfullyDeployedPolicies := handlePolicyDeployment(pdpUpdate, p)
+ mapJson, err := policymap.FormatMapofAnyType(successfullyDeployedPolicies)
+ if len(failureMessage) > 0 {
+ failureMessages = append(failureMessages, "{Deployment Errors:"+strings.Join(failureMessage, "")+"}")
+ }
+ if err != nil {
+ failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|")
+ resMessage := fmt.Errorf("PDP Update Failed: failed to format successfullyDeployedPolicies json %v", failureMessages)
+ if err = sendFailureResponse(p, &pdpUpdate, resMessage); err != nil {
+ log.Debugf("Failed to send update error response: %v", err)
+ return err
+ }
+ }
+ loggingPoliciesList = mapJson
+ }
+
+ // Check if "PoliciesToBeUndeployed" is empty or not
+ if len(pdpUpdate.PoliciesToBeUndeployed) > 0 {
+ log.Infof("Found Policies to be undeployed")
+ failureMessage, successfullyUndeployedPolicies := handlePolicyUndeployment(pdpUpdate, p)
+ mapJson, err := policymap.FormatMapofAnyType(successfullyUndeployedPolicies)
+ if len(failureMessage) > 0 {
+ failureMessages = append(failureMessages, "{UnDeployment Errors:"+strings.Join(failureMessage, "")+"}")
+ }
+ if err != nil {
+ failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|")
+ resMessage := fmt.Errorf("PDP Update Failed: failed to format successfullyUnDeployedPolicies json %v", failureMessages)
+ if err = sendFailureResponse(p, &pdpUpdate, resMessage); err != nil {
+ log.Debugf("Failed to send update error response: %v", err)
+ return err
+ }
+ }
+ loggingPoliciesList = mapJson
+ }
+
+ if len(pdpUpdate.PoliciesToBeDeployed) == 0 && len(pdpUpdate.PoliciesToBeUndeployed) == 0 {
+ //Response for PAP Registration
+ err = sendSuccessResponse(p, &pdpUpdate, "PDP UPDATE is successfull")
+ if err != nil {
+ log.Debugf("Failed to Send Update Response Message: %v\n", err)
+ return err
+ }
+ } else {
+ //Send Response for Deployment or Undeployment or when both deployment and undeployment comes together
+ if err := sendPDPStatusResponse(pdpUpdate, p, loggingPoliciesList, failureMessages); err != nil {
+ return err
+ }
}
log.Infof("PDP_STATUS Message Sent Successfully")
go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
return nil
}
+
+// build bundle tar file
+func createBundleFunc(execCmd func(string, ...string) *exec.Cmd, toscaPolicy model.ToscaPolicy) (string, error) {
+ return bundleserver.BuildBundle(execCmd)
+}
+
+func sendSuccessResponse(p publisher.PdpStatusSender, pdpUpdate *model.PdpUpdate, respMessage string) error {
+ if err := publisher.SendPdpUpdateResponse(p, pdpUpdate, respMessage); err != nil {
+ return err
+ }
+ return nil
+}
+
+func sendFailureResponse(p publisher.PdpStatusSender, pdpUpdate *model.PdpUpdate, respMessage error) error {
+ if err := publisher.SendPdpUpdateErrorResponse(p, pdpUpdate, respMessage); err != nil {
+ return err
+ }
+ return nil
+}
+
+func sendPDPStatusResponse(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender, loggingPoliciesList string, failureMessages []string) error {
+ if len(failureMessages) > 0 {
+ resMessage := fmt.Errorf("PDP Update Failed: %v", failureMessages)
+ if err := sendFailureResponse(p, &pdpUpdate, resMessage); err != nil {
+ log.Warnf("Failed to send update error response: %v", err)
+ return err
+ }
+ } else {
+
+ if len(pdpUpdate.PoliciesToBeUndeployed) == 0 {
+ resMessage := fmt.Sprintf("PDP Update Successful for all policies: %v", loggingPoliciesList)
+ if err := sendSuccessResponse(p, &pdpUpdate, resMessage); err != nil {
+ log.Warnf("Failed to send update response: %v", err)
+ return err
+ }
+ log.Infof("Processed policies_to_be_deployed successfully")
+ } else if len(pdpUpdate.PoliciesToBeDeployed) == 0 {
+
+ resMessage := fmt.Sprintf("PDP Update Policies undeployed :%v", loggingPoliciesList)
+
+ if err := sendSuccessResponse(p, &pdpUpdate, resMessage); err != nil {
+ log.Warnf("Failed to Send Update Response Message: %v", err)
+ return err
+ }
+ log.Infof("Processed policies_to_be_undeployed successfully")
+ } else {
+
+ if err := sendSuccessResponse(p, &pdpUpdate, "PDP UPDATE is successfull"); err != nil {
+ log.Warnf("Failed to Send Update Response Message: %v", err)
+ return err
+ }
+ }
+
+ }
+ return nil
+}
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
index 4d5d7dc..d29c814 100644
--- a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
@@ -1,5 +1,5 @@
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -22,7 +22,9 @@ import (
"errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
+ "policy-opa-pdp/consts"
"policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
+ "policy-opa-pdp/pkg/policymap"
"testing"
)
@@ -50,7 +52,7 @@ func TestPdpUpdateMessageHandler_Success(t *testing.T) {
mockSender := new(mocks.PdpStatusSender)
mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
- err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
assert.NoError(t, err)
}
@@ -70,7 +72,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure1(t *testing.T) {
mockSender := new(mocks.PdpStatusSender)
mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
- err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
assert.Error(t, err)
}
@@ -91,7 +93,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure2(t *testing.T) {
mockSender := new(mocks.PdpStatusSender)
mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
- err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
assert.Error(t, err)
}
@@ -112,7 +114,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure3(t *testing.T) {
mockSender := new(mocks.PdpStatusSender)
mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
- err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
assert.Error(t, err)
}
@@ -131,7 +133,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure4(t *testing.T) {
mockSender := new(mocks.PdpStatusSender)
mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
- err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
assert.Error(t, err)
}
@@ -160,7 +162,7 @@ func TestPdpUpdateMessageHandler_Fails_Sending_UpdateResponse(t *testing.T) {
mockSender := new(mocks.PdpStatusSender)
mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Error in Sending PDP Update Response"))
- err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
assert.Error(t, err)
}
@@ -190,7 +192,63 @@ func TestPdpUpdateMessageHandler_Invalid_Starttimeinterval(t *testing.T) {
mockSender := new(mocks.PdpStatusSender)
mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Invalid Interval Time for Heartbeat"))
- err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
assert.Error(t, err)
}
+
+/*
+PdpUpdateMessageHandler_Successful_Deployment
+Description: Test by sending a valid input with policies to be deployed
+Input: valid input with PoliciesToBeDeployed
+Expected Output: Policies should be deployed successfully and corresponding messages sent.
+*/
+func TestPdpUpdateMessageHandler_Successful_Deployment(t *testing.T) {
+ messageString := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed": [{"type": "onap.policies.native.opa","type_version": "1.0.0","properties": {"data": {"zone": "ewogICJ6b25lIjogewogICAgInpvbmVfYWNjZXNzX2xvZ3MiOiBbCiAgICAgIHsgImxvZ19pZCI6ICJsb2cxIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDA5OjAwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJncmFudGVkIiwgInVzZXIiOiAidXNlcjEiIH0sCiAgICAgIHsgImxvZ19pZCI6ICJsb2cyIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDEwOjMwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJkZW5pZWQiLCAidXNlciI6ICJ1c2VyMiIgfSwKICAgICAgeyAibG9nX2lkIjogImxvZzMiLCAidGltZXN0YW1wIjogIjIwMjQtMTEtMDFUMTE6MDA6MDBaIiwgInpvbmVfaWQiOiAiem9uZUIiLCAiYWNjZXNzIjogImdyYW50ZWQiLCAidXNlciI6ICJ1c2VyMyIgfQogICAgXQogIH0KfQo="},"policy": {"zone": "cGFja2FnZSB6b25lCgppbXBvcnQgcmVnby52MQoKZGVmYXVsdCBhbGxvdyA6PSBmYWxzZQoKYWxsb3cgaWYgewogICAgaGFzX3pvbmVfYWNjZXNzCiAgICBhY3Rpb25faXNfbG9nX3ZpZXcKfQoKYWN0aW9uX2lzX2xvZ192aWV3IGlmIHsKICAgICJ2aWV3IiBpbiBpbnB1dC5hY3Rpb25zCn0KCmhhc196b25lX2FjY2VzcyBjb250YWlucyBhY2Nlc3NfZGF0YSBpZiB7CiAgICBzb21lIHpvbmVfZGF0YSBpbiBkYXRhLnpvbmUuem9uZS56b25lX2FjY2Vzc19sb2dzCiAgICB6b25lX2RhdGEudGltZXN0YW1wID49IGlucHV0LnRpbWVfcGVyaW9kLmZyb20KICAgIHpvbmVfZGF0YS50aW1lc3RhbXAgPCBpbnB1dC50aW1lX3BlcmlvZC50bwogICAgem9uZV9kYXRhLnpvbmVfaWQgPT0gaW5wdXQuem9uZV9pZAogICAgYWNjZXNzX2RhdGEgOj0ge2RhdGF0eXBlOiB6b25lX2RhdGFbZGF0YXR5cGVdIHwgZGF0YXR5cGUgaW4gaW5wdXQuZGF0YXR5cGVzfQp9Cg=="}},"name": "zone","version": "1.0.0","metadata": {"policy-id": "zone","policy-version": "1.0.0"}}],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_UPDATE",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059",
+ "pdpGroup":"opaGroup",
+ "pdpSubgroup":"opa"
+ }`
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ consts.Data ="/tmp/data"
+ consts.Policies ="/tmp/policies"
+ err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
+ assert.NoError(t, err)
+}
+
+
+/*
+PdpUpdateMessageHandler_Skipping_Deployment
+Description: Test by sending a valid input with policies to be deployed where the policy is already deployed
+Input: valid input with PoliciesToBeDeployed
+Expected Output: Policies should be skipping deployment since it is already present.
+*/
+func TestPdpUpdateMessageHandler_Skipping_Deployment(t *testing.T) {
+ messageString := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed": [{"type": "onap.policies.native.opa","type_version": "1.0.0","properties": {"data": {"zone": "ewogICJ6b25lIjogewogICAgInpvbmVfYWNjZXNzX2xvZ3MiOiBbCiAgICAgIHsgImxvZ19pZCI6ICJsb2cxIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDA5OjAwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJncmFudGVkIiwgInVzZXIiOiAidXNlcjEiIH0sCiAgICAgIHsgImxvZ19pZCI6ICJsb2cyIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDEwOjMwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJkZW5pZWQiLCAidXNlciI6ICJ1c2VyMiIgfSwKICAgICAgeyAibG9nX2lkIjogImxvZzMiLCAidGltZXN0YW1wIjogIjIwMjQtMTEtMDFUMTE6MDA6MDBaIiwgInpvbmVfaWQiOiAiem9uZUIiLCAiYWNjZXNzIjogImdyYW50ZWQiLCAidXNlciI6ICJ1c2VyMyIgfQogICAgXQogIH0KfQo="},"policy": {"zone": "cGFja2FnZSB6b25lCgppbXBvcnQgcmVnby52MQoKZGVmYXVsdCBhbGxvdyA6PSBmYWxzZQoKYWxsb3cgaWYgewogICAgaGFzX3pvbmVfYWNjZXNzCiAgICBhY3Rpb25faXNfbG9nX3ZpZXcKfQoKYWN0aW9uX2lzX2xvZ192aWV3IGlmIHsKICAgICJ2aWV3IiBpbiBpbnB1dC5hY3Rpb25zCn0KCmhhc196b25lX2FjY2VzcyBjb250YWlucyBhY2Nlc3NfZGF0YSBpZiB7CiAgICBzb21lIHpvbmVfZGF0YSBpbiBkYXRhLnpvbmUuem9uZS56b25lX2FjY2Vzc19sb2dzCiAgICB6b25lX2RhdGEudGltZXN0YW1wID49IGlucHV0LnRpbWVfcGVyaW9kLmZyb20KICAgIHpvbmVfZGF0YS50aW1lc3RhbXAgPCBpbnB1dC50aW1lX3BlcmlvZC50bwogICAgem9uZV9kYXRhLnpvbmVfaWQgPT0gaW5wdXQuem9uZV9pZAogICAgYWNjZXNzX2RhdGEgOj0ge2RhdGF0eXBlOiB6b25lX2RhdGFbZGF0YXR5cGVdIHwgZGF0YXR5cGUgaW4gaW5wdXQuZGF0YXR5cGVzfQp9Cg=="}},"name": "zone","version": "1.0.0","metadata": {"policy-id": "zone","policy-version": "1.0.0"}}],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_UPDATE",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059",
+ "pdpGroup":"opaGroup",
+ "pdpSubgroup":"opa"
+ }`
+
+ policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"data": ["zone"],"policy": ["zone"],"policy-id": "zone","policy-version": "1.0.0"}]}`
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+ err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
+ assert.NoError(t, err)
+}
diff --git a/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go b/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
new file mode 100644
index 0000000..9770d88
--- /dev/null
+++ b/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
@@ -0,0 +1,196 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2025: Deutsche Telekom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
+// ========================LICENSE_END===================================
+
+// will process the update message from pap and send the pdp status response.
+package handler
+
+import (
+ "context"
+ "fmt"
+ "path/filepath"
+ "policy-opa-pdp/consts"
+ "policy-opa-pdp/pkg/kafkacomm/publisher"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/metrics"
+ "policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/opasdk"
+ "policy-opa-pdp/pkg/policymap"
+ "policy-opa-pdp/pkg/utils"
+ "strings"
+)
+
+// processPoliciesTobeUndeployed handles the undeployment of policies
+func processPoliciesTobeUndeployed(undeployedPolicies map[string]string) ([]string, map[string]string) {
+ var failureMessages []string
+
+ successfullyUndeployedPolicies := make(map[string]string)
+
+ // Unmarshal the last known policies
+ deployedPolicies, err := policymap.UnmarshalLastDeployedPolicies(policymap.LastDeployedPolicies)
+ if err != nil {
+ log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+ }
+
+ for policyID, policyVersion := range undeployedPolicies {
+ // Check if undeployed policy exists in deployedPolicies
+ matchedPolicy := findDeployedPolicy(policyID, policyVersion, deployedPolicies)
+ if matchedPolicy != nil {
+ // Handle undeployment for the policy
+ errs := policyUndeploymentAction(matchedPolicy)
+ if len(errs) > 0 {
+ metrics.IncrementUndeployFailureCount()
+ metrics.IncrementTotalErrorCount()
+ failureMessages = append(failureMessages, errs...)
+ }
+ deployedPoliciesMap, err := policymap.RemoveUndeployedPoliciesfromMap(matchedPolicy)
+ if err != nil {
+ log.Warnf("Policy Name: %s, Version: %s is not removed from LastDeployedPolicies", policyID, policyVersion)
+ failureMessages = append(failureMessages, "Error in removing from LastDeployedPolicies")
+ }
+ log.Debugf("Policies Map After Undeployment : %s", deployedPoliciesMap)
+ metrics.IncrementUndeploySuccessCount()
+ successfullyUndeployedPolicies[policyID] = policyVersion
+ } else {
+ // Log failure if no match is found
+ log.Debugf("Policy Name: %s, Version: %s is marked for undeployment but was not deployed", policyID, policyVersion)
+ continue
+ }
+ }
+
+ totalPolicies := policymap.GetTotalDeployedPoliciesCountFromMap()
+ metrics.SetTotalPoliciesCount(int64(totalPolicies))
+
+ return failureMessages, successfullyUndeployedPolicies
+}
+
+func handlePolicyUndeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string) {
+
+ // Extract undeployed policies into a dictionary
+ undeployedPoliciesDict := extractUndeployedPolicies(pdpUpdate.PoliciesToBeUndeployed)
+
+ // Process undeployment actions
+ errorMessages, successfullyUndeployedPolicies := processPoliciesTobeUndeployed(undeployedPoliciesDict)
+
+ return errorMessages, successfullyUndeployedPolicies
+
+}
+
+// ExtractUndeployedPolicies extracts policy names and versions into a map
+func extractUndeployedPolicies(policies []model.ToscaConceptIdentifier) map[string]string {
+ undeployedPoliciesDict := make(map[string]string)
+ for _, policy := range policies {
+ undeployedPoliciesDict[policy.Name] = policy.Version
+ log.Infof("Extracted Policy Name: %s, Version: %s for undeployment", policy.Name, policy.Version)
+ }
+ return undeployedPoliciesDict
+}
+
+// HandlePolicyUndeployment processes the actual undeployment actions for a policy
+func policyUndeploymentAction(policy map[string]interface{}) []string {
+ var failureMessages []string
+
+ // Delete "policy" sdk and directories
+ policyErrors := removePolicyFromSdkandDir(policy)
+ failureMessages = append(failureMessages, policyErrors...)
+
+ // Delete "data" sdk and directories
+ dataErrors := removeDataFromSdkandDir(policy)
+ failureMessages = append(failureMessages, dataErrors...)
+
+ return failureMessages
+}
+
+// removeDataFromSdkandDir handles the "data" directories in the policy
+func removeDataFromSdkandDir(policy map[string]interface{}) []string {
+ var failureMessages []string
+
+ if dataKeys, ok := policy["data"].([]interface{}); ok {
+ for _, dataKey := range dataKeys {
+ keyPath := "/" + strings.Replace(dataKey.(string), ".", "/", -1)
+ log.Debugf("Deleting data from OPA at keypath: %s", keyPath)
+ if err := opasdk.DeleteData(context.Background(), keyPath); err != nil {
+ failureMessages = append(failureMessages, err.Error())
+ continue
+ }
+ if err := removeDataDirectory(keyPath); err != nil {
+ failureMessages = append(failureMessages, err.Error())
+ }
+ }
+ } else {
+ failureMessages = append(failureMessages, fmt.Sprintf("%s:%s Invalid JSON structure: 'data' is missing or not an array", policy["policy-id"], policy["policy-version"]))
+ }
+
+ return failureMessages
+}
+
+// removePolicyFromSdkandDir handles the "policy" directories in the policy
+func removePolicyFromSdkandDir(policy map[string]interface{}) []string {
+ var failureMessages []string
+
+ if policyKeys, ok := policy["policy"].([]interface{}); ok {
+ for _, policyKey := range policyKeys {
+ keyPath := "/" + strings.Replace(policyKey.(string), ".", "/", -1)
+ if err := opasdk.DeletePolicy(context.Background(), policyKey.(string)); err != nil {
+ failureMessages = append(failureMessages, err.Error())
+ continue
+ }
+ if err := removePolicyDirectory(keyPath); err != nil {
+ failureMessages = append(failureMessages, err.Error())
+ }
+ }
+ } else {
+ failureMessages = append(failureMessages, fmt.Sprintf("%s:%s Invalid JSON structure: 'policy' is missing or not an array", policy["policy-id"], policy["policy-version"]))
+ }
+
+ return failureMessages
+}
+
+// RemoveDataDirectory removes a directory for data
+func removeDataDirectory(dataKey string) error {
+ dataPath := filepath.Join(consts.Data, dataKey)
+ log.Debugf("Removing data directory: %s", dataPath)
+ if err := utils.RemoveDirectory(dataPath); err != nil {
+ return fmt.Errorf("Failed to handle directory for data %s: %v", dataPath, err)
+ }
+ return nil
+}
+
+// RemovePolicyDirectory removes a directory for policies
+func removePolicyDirectory(policyKey string) error {
+ policyPath := filepath.Join(consts.Policies, policyKey)
+ log.Debugf("Removing policy directory: %s", policyPath)
+ if err := utils.RemoveDirectory(policyPath); err != nil {
+ return fmt.Errorf("Failed to handle directory for policy %s: %v", policyPath, err)
+ }
+ return nil
+}
+
+// findDeployedPolicy searches for a policy in deployedPolicies
+func findDeployedPolicy(policyID, policyVersion string, deployedPolicies []map[string]interface{}) map[string]interface{} {
+ for _, policy := range deployedPolicies {
+ // Extract policy-id and policy-version from the deployed policy
+ id, idOk := policy["policy-id"].(string)
+ version, versionOk := policy["policy-version"].(string)
+
+ // Check if the deployed policy matches the undeployed policy
+ if idOk && versionOk && id == policyID && version == policyVersion {
+ return policy
+ }
+ }
+ return nil
+}
diff --git a/pkg/kafkacomm/pdp_topic_consumer.go b/pkg/kafkacomm/pdp_topic_consumer.go
index 3d19e6c..ce7fc4c 100644
--- a/pkg/kafkacomm/pdp_topic_consumer.go
+++ b/pkg/kafkacomm/pdp_topic_consumer.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -93,6 +93,9 @@ func NewKafkaConsumer() (*KafkaConsumer, error) {
configMap.SetKey("sasl.username", username)
configMap.SetKey("sasl.password", password)
configMap.SetKey("security.protocol", "SASL_PLAINTEXT")
+ configMap.SetKey("fetch.max.bytes", 50*1024*1024)
+ configMap.SetKey("max.partition.fetch.bytes",50*1024*1024)
+ configMap.SetKey("socket.receive.buffer.bytes", 50*1024*1024)
configMap.SetKey("session.timeout.ms", "30000")
configMap.SetKey("max.poll.interval.ms", "300000")
configMap.SetKey("enable.partition.eof", true)
diff --git a/pkg/kafkacomm/pdp_topic_consumer_test.go b/pkg/kafkacomm/pdp_topic_consumer_test.go
index 1518474..eb2db03 100644
--- a/pkg/kafkacomm/pdp_topic_consumer_test.go
+++ b/pkg/kafkacomm/pdp_topic_consumer_test.go
@@ -20,16 +20,16 @@
package kafkacomm
import (
+ "bou.ke/monkey"
"errors"
- "policy-opa-pdp/pkg/kafkacomm/mocks"
- "testing"
- "sync"
- "fmt"
+ "fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"policy-opa-pdp/cfg"
- "bou.ke/monkey"
+ "policy-opa-pdp/pkg/kafkacomm/mocks"
+ "sync"
+ "testing"
)
var kafkaConsumerFactory = kafka.NewConsumer
@@ -175,34 +175,34 @@ func TestKafkaConsumer_Unsubscribe_Nil_Error(t *testing.T) {
}
-//Helper function to reset
+// Helper function to reset
func resetKafkaConsumerSingleton() {
- consumerOnce = sync.Once{}
- consumerInstance = nil
+ consumerOnce = sync.Once{}
+ consumerInstance = nil
}
-//Test for mock error creating consumers
+// Test for mock error creating consumers
func TestNewKafkaConsumer_ErrorCreatingConsumer(t *testing.T) {
- resetKafkaConsumerSingleton()
- monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) {
- return nil, fmt.Errorf("mock error creating consumer")
- })
- defer monkey.Unpatch(kafka.NewConsumer)
-
- consumer, err := NewKafkaConsumer()
- assert.Nil(t, consumer)
- assert.EqualError(t, err, "Kafka Consumer instance not created")
+ resetKafkaConsumerSingleton()
+ monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) {
+ return nil, fmt.Errorf("mock error creating consumer")
+ })
+ defer monkey.Unpatch(kafka.NewConsumer)
+
+ consumer, err := NewKafkaConsumer()
+ assert.Nil(t, consumer)
+ assert.EqualError(t, err, "Kafka Consumer instance not created")
}
// Test for error creating kafka instance
func TestNewKafkaConsumer_NilConsumer(t *testing.T) {
- resetKafkaConsumerSingleton()
- monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) {
- return nil, nil
- })
- defer monkey.Unpatch(kafka.NewConsumer)
-
- consumer, err := NewKafkaConsumer()
- assert.Nil(t, consumer)
- assert.EqualError(t, err, "Kafka Consumer instance not created")
+ resetKafkaConsumerSingleton()
+ monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) {
+ return nil, nil
+ })
+ defer monkey.Unpatch(kafka.NewConsumer)
+
+ consumer, err := NewKafkaConsumer()
+ assert.Nil(t, consumer)
+ assert.EqualError(t, err, "Kafka Consumer instance not created")
}
diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat.go b/pkg/kafkacomm/publisher/pdp-heartbeat.go
index 42a5e70..0f68840 100644
--- a/pkg/kafkacomm/publisher/pdp-heartbeat.go
+++ b/pkg/kafkacomm/publisher/pdp-heartbeat.go
@@ -30,6 +30,7 @@ import (
"policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/pdpattributes"
"policy-opa-pdp/pkg/pdpstate"
+ "policy-opa-pdp/pkg/policymap"
"sync"
"time"
)
@@ -43,13 +44,17 @@ var (
// Initializes a timer that sends periodic heartbeat messages to indicate the health and state of the PDP.
func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) {
- if intervalMs <= 0 {
+ mu.Lock()
+ defer mu.Unlock()
+
+ if intervalMs < 0 {
log.Errorf("Invalid interval provided: %d. Interval must be greater than zero.", intervalMs)
ticker = nil
return
+ } else if intervalMs == 0 {
+ intervalMs = currentInterval
+
}
- mu.Lock()
- defer mu.Unlock()
if ticker != nil && intervalMs == currentInterval {
log.Debug("Ticker is already running")
@@ -59,7 +64,7 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) {
if ticker != nil {
ticker.Stop()
}
- // StopTicker()
+
currentInterval = intervalMs
ticker = time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
@@ -89,12 +94,23 @@ func sendPDPHeartBeat(s PdpStatusSender) error {
Healthy: model.Healthy,
Name: pdpattributes.PdpName,
Description: "Pdp heartbeat",
+ Policies: []model.ToscaConceptIdentifier{},
PdpGroup: consts.PdpGroup,
PdpSubgroup: &pdpattributes.PdpSubgroup,
}
pdpStatus.RequestID = uuid.New().String()
pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli())
+ policiesMap := policymap.LastDeployedPolicies
+
+ if policiesMap != "" {
+ if (policymap.ExtractDeployedPolicies(policiesMap)) == nil {
+ log.Warnf("No Policies extracted from Policy Map")
+ } else {
+ pdpStatus.Policies = policymap.ExtractDeployedPolicies(policiesMap)
+ }
+ }
+
err := s.SendPdpStatus(pdpStatus)
log.Debugf("Sending Heartbeat ...")
if err != nil {
diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
index ba72c77..c3676c8 100644
--- a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
+++ b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
@@ -25,8 +25,7 @@ import (
"github.com/stretchr/testify/mock"
"policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
"testing"
- )
-
+)
/*
Success Case 1
diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration.go b/pkg/kafkacomm/publisher/pdp-pap-registration.go
index 5525f61..c726b2b 100644
--- a/pkg/kafkacomm/publisher/pdp-pap-registration.go
+++ b/pkg/kafkacomm/publisher/pdp-pap-registration.go
@@ -54,7 +54,6 @@ func (s *RealPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error {
log.Warnf("failed to marshal PdpStatus to JSON: %v", err)
return err
}
- log.Debugf("Producer saved in RealPdp StatusSender")
kafkaMessage := &kafka.Message{
TopicPartition: kafka.TopicPartition{
diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
index 84013cb..8618eeb 100644
--- a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
+++ b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
@@ -22,14 +22,14 @@ package publisher
import (
"errors"
"fmt"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
- "time"
- "github.com/google/uuid"
"policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
- "github.com/confluentinc/confluent-kafka-go/kafka"
"policy-opa-pdp/pkg/model"
"testing"
+ "time"
)
type MockPdpStatusSender struct {
@@ -140,4 +140,3 @@ func TestSendPdpStatus_Failure(t *testing.T) {
// Verify that the Produce method was called exactly once
mockProducer.AssertExpectations(t)
}
-
diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher.go b/pkg/kafkacomm/publisher/pdp-status-publisher.go
index 4a13b1c..3e25780 100644
--- a/pkg/kafkacomm/publisher/pdp-status-publisher.go
+++ b/pkg/kafkacomm/publisher/pdp-status-publisher.go
@@ -29,6 +29,7 @@ import (
"policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/pdpattributes"
"policy-opa-pdp/pkg/pdpstate"
+ "policy-opa-pdp/pkg/policymap"
"time"
"github.com/google/uuid"
@@ -36,10 +37,10 @@ import (
// Sends a PDP_STATUS message to indicate the successful processing of a PDP_UPDATE request
// received from the Policy Administration Point (PAP).
-func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error {
+func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate, resMessage string) error {
responseStatus := model.Success
- responseMessage := "PDP Update was Successful"
+ responseMessage := resMessage
pdpStatus := model.PdpStatus{
MessageType: model.PDP_STATUS,
@@ -50,7 +51,7 @@ func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error
Description: "Pdp Status Response Message For Pdp Update",
PdpGroup: consts.PdpGroup,
PdpSubgroup: &pdpattributes.PdpSubgroup,
- // Policies: [],
+ Policies: []model.ToscaConceptIdentifier{},
PdpResponse: &model.PdpResponseDetails{
ResponseTo: &pdpUpdate.RequestId,
ResponseStatus: &responseStatus,
@@ -61,6 +62,16 @@ func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error
pdpStatus.RequestID = uuid.New().String()
pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli())
+ policiesMap := policymap.LastDeployedPolicies
+
+ if policiesMap != "" {
+ if (policymap.ExtractDeployedPolicies(policiesMap)) == nil {
+ log.Warnf("No Policies extracted from Policy Map")
+ } else {
+ pdpStatus.Policies = policymap.ExtractDeployedPolicies(policiesMap)
+ }
+ }
+
log.Infof("Sending PDP Status With Update Response")
err := s.SendPdpStatus(pdpStatus)
@@ -73,6 +84,53 @@ func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error
}
+func SendPdpUpdateErrorResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate, err error) error {
+
+ responseStatus := model.Failure
+ responseMessage := fmt.Sprintf("%v", err)
+
+ pdpStatus := model.PdpStatus{
+ MessageType: model.PDP_STATUS,
+ PdpType: consts.PdpType,
+ State: pdpstate.State,
+ Healthy: model.Healthy,
+ Name: pdpattributes.PdpName,
+ Description: "Pdp Status Response Message For Pdp Update",
+ PdpGroup: consts.PdpGroup,
+ PdpSubgroup: &pdpattributes.PdpSubgroup,
+ Policies: []model.ToscaConceptIdentifier{},
+ PdpResponse: &model.PdpResponseDetails{
+ ResponseTo: &pdpUpdate.RequestId,
+ ResponseStatus: &responseStatus,
+ ResponseMessage: &responseMessage,
+ },
+ }
+
+ pdpStatus.RequestID = uuid.New().String()
+ pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli())
+
+ policiesMap := policymap.LastDeployedPolicies
+
+ if policiesMap != "" {
+ if (policymap.ExtractDeployedPolicies(policiesMap)) == nil {
+ log.Warnf("No Policies extracted from Policy Map")
+ } else {
+ pdpStatus.Policies = policymap.ExtractDeployedPolicies(policiesMap)
+ }
+ }
+
+ log.Infof("Sending PDP Status With Update Error Response")
+
+ err = s.SendPdpStatus(pdpStatus)
+ if err != nil {
+ log.Warnf("Failed to send PDP Update Message : %v", err)
+ return err
+ }
+
+ return nil
+
+}
+
// Sends a PDP_STATUS message to indicate a state change in the PDP (e.g., from PASSIVE to ACTIVE).
func SendStateChangeResponse(s PdpStatusSender, pdpStateChange *model.PdpStateChange) error {
@@ -87,7 +145,7 @@ func SendStateChangeResponse(s PdpStatusSender, pdpStateChange *model.PdpStateCh
Description: "Pdp Status Response Message to Pdp State Change",
PdpGroup: consts.PdpGroup,
PdpSubgroup: &pdpattributes.PdpSubgroup,
- // Policies: [],
+ Policies: []model.ToscaConceptIdentifier{},
PdpResponse: &model.PdpResponseDetails{
ResponseTo: &pdpStateChange.RequestId,
ResponseStatus: &responseStatus,
diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
index 83154ca..17805fe 100644
--- a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
+++ b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
@@ -35,7 +35,7 @@ func TestSendPdpUpdateResponse_Success(t *testing.T) {
mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
- err := SendPdpUpdateResponse(mockSender, pdpUpdate)
+ err := SendPdpUpdateResponse(mockSender, pdpUpdate, "PDPUpdate Successful")
assert.NoError(t, err)
mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
}
@@ -48,7 +48,7 @@ func TestSendPdpUpdateResponse_Failure(t *testing.T) {
pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
- err := SendPdpUpdateResponse(mockSender, pdpUpdate)
+ err := SendPdpUpdateResponse(mockSender, pdpUpdate, "PDPUpdate Failure")
assert.Error(t, err)
diff --git a/pkg/metrics/counters.go b/pkg/metrics/counters.go
index e126554..7734cff 100644
--- a/pkg/metrics/counters.go
+++ b/pkg/metrics/counters.go
@@ -25,12 +25,13 @@ import "sync"
var TotalErrorCount int64
var DecisionSuccessCount int64
var DecisionFailureCount int64
+var DeployFailureCount int64
+var DeploySuccessCount int64
+var UndeployFailureCount int64
+var UndeploySuccessCount int64
+var TotalPoliciesCount int64
var mu sync.Mutex
-
-
-
-
// Increment counter
func IncrementTotalErrorCount() {
mu.Lock()
@@ -39,7 +40,7 @@ func IncrementTotalErrorCount() {
}
// returns pointer to the counter
-func TotalErrorCountRef() *int64 {
+func totalErrorCountRef() *int64 {
mu.Lock()
defer mu.Unlock()
return &TotalErrorCount
@@ -53,7 +54,7 @@ func IncrementDecisionSuccessCount() {
}
// returns pointer to the counter
-func TotalDecisionSuccessCountRef() *int64 {
+func totalDecisionSuccessCountRef() *int64 {
mu.Lock()
defer mu.Unlock()
return &DecisionSuccessCount
@@ -74,3 +75,84 @@ func TotalDecisionFailureCountRef() *int64 {
return &DecisionFailureCount
}
+
+// Increment counter
+func IncrementDeploySuccessCount() {
+ mu.Lock()
+ DeploySuccessCount++
+ mu.Unlock()
+}
+
+// returns pointer to the counter
+
+func totalDeploySuccessCountRef() *int64 {
+ mu.Lock()
+ defer mu.Unlock()
+ return &DeploySuccessCount
+
+}
+
+// Increment counter
+func IncrementDeployFailureCount() {
+ mu.Lock()
+ DeployFailureCount++
+ mu.Unlock()
+}
+
+// returns pointer to the counter
+
+func totalDeployFailureCountRef() *int64 {
+ mu.Lock()
+ defer mu.Unlock()
+ return &DeployFailureCount
+
+}
+
+
+// Increment counter
+func IncrementUndeploySuccessCount() {
+ mu.Lock()
+ UndeploySuccessCount++
+ mu.Unlock()
+}
+
+// returns pointer to the counter
+
+func totalUndeploySuccessCountRef() *int64 {
+ mu.Lock()
+ defer mu.Unlock()
+ return &UndeploySuccessCount
+
+}
+
+// Increment counter
+func IncrementUndeployFailureCount() {
+ mu.Lock()
+ UndeployFailureCount++
+ mu.Unlock()
+}
+
+// returns pointer to the counter
+
+func totalUndeployFailureCountRef() *int64 {
+ mu.Lock()
+ defer mu.Unlock()
+ return &UndeployFailureCount
+
+}
+
+// Increment counter
+func SetTotalPoliciesCount(newCount int64) {
+ mu.Lock()
+ TotalPoliciesCount = newCount
+ mu.Unlock()
+}
+
+// returns pointer to the counter
+
+func totalPoliciesCountRef() *int64 {
+ mu.Lock()
+ defer mu.Unlock()
+ return &TotalPoliciesCount
+}
+
diff --git a/pkg/metrics/counters_test.go b/pkg/metrics/counters_test.go
index ba8646b..852e365 100644
--- a/pkg/metrics/counters_test.go
+++ b/pkg/metrics/counters_test.go
@@ -39,7 +39,7 @@ func TestCounters(t *testing.T) {
}()
}
wg.Wait()
- assert.Equal(t, int64(5), *TotalErrorCountRef())
+ assert.Equal(t, int64(5), *totalErrorCountRef())
// Test IncrementQuerySuccessCount and TotalQuerySuccessCountRef
@@ -61,7 +61,7 @@ func TestCounters(t *testing.T) {
wg.Wait()
- assert.Equal(t, int64(7), *TotalDecisionSuccessCountRef())
+ assert.Equal(t, int64(7), *totalDecisionSuccessCountRef())
// Test IncrementDecisionFailureCount and TotalDecisionFailureCountRef
diff --git a/pkg/metrics/statistics-provider.go b/pkg/metrics/statistics-provider.go
index 9b34839..0f826bc 100644
--- a/pkg/metrics/statistics-provider.go
+++ b/pkg/metrics/statistics-provider.go
@@ -57,20 +57,20 @@ func FetchCurrentStatistics(res http.ResponseWriter, req *http.Request) {
var statReport oapicodegen.StatisticsReport
- statReport.DecisionSuccessCount = TotalDecisionSuccessCountRef()
+
+ statReport.DecisionSuccessCount = totalDecisionSuccessCountRef()
statReport.DecisionFailureCount = TotalDecisionFailureCountRef()
- statReport.TotalErrorCount = TotalErrorCountRef()
+ statReport.TotalErrorCount = totalErrorCountRef()
+ statReport.DeployFailureCount = totalDeployFailureCountRef()
+ statReport.DeploySuccessCount = totalDeploySuccessCountRef()
+ statReport.UndeployFailureCount = totalUndeployFailureCountRef()
+ statReport.UndeploySuccessCount = totalUndeploySuccessCountRef()
+ statReport.TotalPoliciesCount = totalPoliciesCountRef()
// not implemented hardcoding the values to zero
// will be implemeneted in phase-2
- zerovalue := int64(0)
onevalue := int64(1)
- statReport.TotalPoliciesCount = &zerovalue
statReport.TotalPolicyTypesCount = &onevalue
- statReport.DeployFailureCount = &zerovalue
- statReport.DeploySuccessCount = &zerovalue
- statReport.UndeployFailureCount = &zerovalue
- statReport.UndeploySuccessCount = &zerovalue
value := int32(200)
statReport.Code = &value
diff --git a/pkg/model/mesages.go b/pkg/model/mesages.go
index 269f45f..966cd46 100644
--- a/pkg/model/mesages.go
+++ b/pkg/model/mesages.go
@@ -86,11 +86,11 @@ type PdpStatus struct {
// models-pdp/src/main/java/org/onap/policy/models/pdp/concepts/PdpUpdate.java
type PdpUpdate struct {
Source string `json:"source" validate:"required"`
- PdpHeartbeatIntervalMs int64 `json:"pdpHeartbeatIntervalMs" validate:"required"`
+ PdpHeartbeatIntervalMs int64 `json:"pdpHeartbeatIntervalMs"`
MessageType string `json:"messageName" validate:"required"`
- PoliciesToBeDeloyed []string `json:"policiesToBeDeployed" validate:"required"`
- policiesToBeUndeployed []ToscaConceptIdentifier `json:"policiesToBeUndeployed"`
- Name string `json:"name" validate:"required"`
+ PoliciesToBeDeployed []ToscaPolicy `json:"policiesToBeDeployed" validate:"required"`
+ PoliciesToBeUndeployed []ToscaConceptIdentifier `json:"policiesToBeUndeployed"`
+ Name string `json:"name"`
TimestampMs int64 `json:"timestampMs" validate:"required"`
PdpGroup string `json:"pdpGroup" validate:"required"`
PdpSubgroup string `json:"pdpSubgroup" validate:"required"`
diff --git a/pkg/model/messages_test.go b/pkg/model/messages_test.go
index 217cd2a..536f683 100644
--- a/pkg/model/messages_test.go
+++ b/pkg/model/messages_test.go
@@ -114,8 +114,8 @@ func (p *PdpUpdate) Validate() error {
if p.MessageType == "" {
return errors.New("MessageType is required")
}
- if len(p.PoliciesToBeDeloyed) == 0 {
- return errors.New("PoliciesToBeDeloyed is required and must contain at least one policy")
+ if len(p.PoliciesToBeDeployed) == 0 {
+ return errors.New("PoliciesToBeDeployed is required and must contain at least one policy")
}
if p.Name == "" {
return errors.New("Name is required")
@@ -138,11 +138,52 @@ func (p *PdpUpdate) Validate() error {
// TestPdpUpdateSerialization_Positive tests the successful serialization of PdpUpdate.
func TestPdpUpdateSerialization_Success(t *testing.T) {
+ policies := []ToscaPolicy{
+ {
+ Type: "onap.policies.native.opa",
+ TypeVersion: "1.0",
+ Properties: PolicyProperties{
+ Data: map[string]string{
+ "key1": "value1",
+ "key2": "value2",
+ },
+ Policy: map[string]string{
+ "policyKey1": "policyValue1",
+ },
+ },
+ Name: "MySecurityPolicy",
+ Version: "1.0.0",
+ Metadata: Metadata{
+ PolicyID: "policy-id-001",
+ PolicyVersion: "1.0",
+ },
+ },
+ {
+ Type: "onap.policies.native.opa",
+ TypeVersion: "1.0",
+ Properties: PolicyProperties{
+ Data: map[string]string{
+ "threshold": "75",
+ "duration": "30",
+ },
+ Policy: map[string]string{
+ "policyKey2": "policyValue2",
+ },
+ },
+ Name: "MyPerformancePolicy",
+ Version: "1.0.0",
+ Metadata: Metadata{
+ PolicyID: "policy-id-002",
+ PolicyVersion: "1.0",
+ },
+ },
+ }
+
pdpUpdate := PdpUpdate{
Source: "source1",
PdpHeartbeatIntervalMs: 5000,
MessageType: "PDP_UPDATE",
- PoliciesToBeDeloyed: []string{"policy1", "policy2"},
+ PoliciesToBeDeployed: policies,
Name: "ExamplePDP",
TimestampMs: 1633017600000,
PdpGroup: "Group1",
@@ -162,7 +203,7 @@ func TestPdpUpdateSerialization_Failure(t *testing.T) {
Source: "",
PdpHeartbeatIntervalMs: 5000,
MessageType: "",
- PoliciesToBeDeloyed: nil,
+ PoliciesToBeDeployed: nil,
Name: "",
TimestampMs: 0,
PdpGroup: "",
@@ -264,7 +305,6 @@ func TestPdpHealthStatusEnum(t *testing.T) {
}
-
// TestPdpMessageType_String_Success validates the string representation of valid PdpMessageType values.
func TestPdpMessageType_String_Success(t *testing.T) {
diff --git a/pkg/model/toscaconceptidentifier.go b/pkg/model/toscaconceptidentifier.go
index c9d7788..708442c 100644
--- a/pkg/model/toscaconceptidentifier.go
+++ b/pkg/model/toscaconceptidentifier.go
@@ -26,8 +26,8 @@ import (
)
type ToscaConceptIdentifier struct {
- Name string
- Version string
+ Name string `json:"name"`
+ Version string `json:"version"`
}
func NewToscaConceptIdentifier(name, version string) *ToscaConceptIdentifier {
diff --git a/pkg/model/toscapolicy.go b/pkg/model/toscapolicy.go
new file mode 100644
index 0000000..651ab17
--- /dev/null
+++ b/pkg/model/toscapolicy.go
@@ -0,0 +1,43 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2025: Deutsche Telekom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
+// ========================LICENSE_END===================================
+
+// hold the possible values for state of PDP.
+// https://github.com/onap/policy-models/blob/master/models-pdp
+// models-pdp/src/main/java/org/onap/policy/models/pdp/enums/PdpState.java
+package model
+
+import ()
+
+type ToscaPolicy struct {
+ Type string `json:"type"`
+ TypeVersion string `json:"type_version"`
+ Properties PolicyProperties `json:"properties"`
+ Name string `json:"name"`
+ Version string `json:"version"`
+ Metadata Metadata `json:"metadata"`
+}
+
+type PolicyProperties struct {
+ Data map[string]string `json:"data"`
+ Policy map[string]string `json:"policy"`
+}
+
+type Metadata struct {
+ PolicyID string `json:"policy-id"`
+ PolicyVersion string `json:"policy-version"`
+}
diff --git a/pkg/opasdk/opasdk.go b/pkg/opasdk/opasdk.go
index 51a34e7..81b94ce 100644
--- a/pkg/opasdk/opasdk.go
+++ b/pkg/opasdk/opasdk.go
@@ -27,18 +27,22 @@ import (
"context"
"fmt"
"io"
+ "net/http"
"os"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/log"
"sync"
"github.com/open-policy-agent/opa/sdk"
+ "github.com/open-policy-agent/opa/storage"
+ "github.com/open-policy-agent/opa/storage/inmem"
)
// Define the structs
var (
opaInstance *sdk.OPA //A singleton instance of the OPA object
once sync.Once //A sync.Once variable used to ensure that the OPA instance is initialized only once,
+ memStore storage.Store
)
// reads JSON configuration from a file and return a jsonReader
@@ -65,9 +69,11 @@ func GetOPASingletonInstance() (*sdk.OPA, error) {
var err error
once.Do(func() {
var opaErr error
+ memStore = inmem.New()
opaInstance, opaErr = sdk.New(context.Background(), sdk.Options{
// Configure your OPA instance here
V1Compatible: true,
+ Store: memStore,
})
log.Debugf("Create an instance of OPA Object")
if opaErr != nil {
@@ -88,6 +94,158 @@ func GetOPASingletonInstance() (*sdk.OPA, error) {
})
}
})
-
return opaInstance, err
}
+
+func UpsertPolicy(ctx context.Context, policyID string, policyContent []byte) error {
+ txn, err := memStore.NewTransaction(ctx, storage.WriteParams)
+ if err != nil {
+ log.Warnf("Error creating transaction: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+ err = memStore.UpsertPolicy(ctx, txn, policyID, policyContent)
+ if err != nil {
+ log.Warnf("Error inserting policy: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+ err = memStore.Commit(ctx, txn)
+ if err != nil {
+ log.Warnf("Error commiting the transaction: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+ return nil
+}
+
+func DeletePolicy(ctx context.Context, policyID string) error {
+ txn, err := memStore.NewTransaction(ctx, storage.WriteParams)
+ if err != nil {
+ log.Warnf("Error creating transaction: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+ err = memStore.DeletePolicy(ctx, txn, policyID)
+ if err != nil {
+ log.Warnf("Error deleting policy: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+ err = memStore.Commit(ctx, txn)
+ if err != nil {
+ log.Warnf("Error commiting the transaction: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+ return nil
+}
+
+func WriteData(ctx context.Context, dataPath string, data interface{}) error {
+ txn, err := memStore.NewTransaction(ctx, storage.WriteParams)
+ if err != nil {
+ log.Warnf("Error creating transaction: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+
+ // Initialize the path if it doesn't exist
+ err = initializePath(ctx, txn, dataPath)
+ if err != nil {
+ log.Warnf("Error initializling Path : %s", dataPath)
+ log.Warnf("Error : %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+
+ err = memStore.Write(ctx, txn, storage.AddOp, storage.MustParsePath(dataPath), data)
+ if err != nil {
+ log.Warnf("Error Adding data: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+ err = memStore.Commit(ctx, txn)
+ if err != nil {
+ log.Warnf("Error commiting the transaction: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+ return nil
+
+}
+
+func DeleteData(ctx context.Context, dataPath string) error {
+ txn, err := memStore.NewTransaction(ctx, storage.WriteParams)
+ if err != nil {
+ log.Warnf("Error creating transaction: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+ err = memStore.Write(ctx, txn, storage.RemoveOp, storage.MustParsePath(dataPath), nil)
+ if err != nil {
+ log.Warnf("Error deleting data: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+ err = memStore.Commit(ctx, txn)
+ if err != nil {
+ log.Warnf("Error commiting the transaction: %s", err)
+ memStore.Abort(ctx, txn)
+ return err
+ }
+ return nil
+}
+
+func ListPolicies(res http.ResponseWriter, req *http.Request) {
+ ctx := context.Background()
+ rtxn, err := memStore.NewTransaction(ctx, storage.TransactionParams{Write: false})
+ if err != nil {
+ log.Warnf("Error creating transaction %s", err)
+ memStore.Abort(ctx, rtxn)
+ http.Error(res, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ policies, err := memStore.ListPolicies(ctx, rtxn)
+ if err != nil {
+ log.Warnf("Error ListPolicies %s", err)
+ memStore.Abort(ctx, rtxn)
+ http.Error(res, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ for _, policyId := range policies {
+ log.Debugf("Policy ID: %s", policyId)
+ policy, err := memStore.GetPolicy(ctx, rtxn, policyId)
+ if err != nil {
+ log.Warnf("Error GetPolicy %s", err)
+ memStore.Abort(ctx, rtxn)
+ http.Error(res, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ log.Debugf("Policy Content: %s\n", string(policy))
+ }
+ memStore.Abort(ctx, rtxn)
+ res.WriteHeader(http.StatusOK)
+ res.Write([]byte("Check logs"))
+}
+
+func initializePath(ctx context.Context, txn storage.Transaction, path string) error {
+ segments := storage.MustParsePath(path)
+ for i := 1; i <= len(segments); i++ {
+ subPath := segments[:i]
+ _, err := memStore.Read(ctx, txn, subPath)
+ if err != nil && storage.IsNotFound(err) {
+ // Create the intermediate path
+ log.Debugf("storage not found creating : %s", subPath.String())
+ err = memStore.Write(ctx, txn, storage.AddOp, subPath, map[string]interface{}{})
+ if err != nil {
+ log.Debugf("Error initializing path: %s", err)
+ return err
+ }
+ } else if err != nil {
+ log.Debugf("Error reading path: %s", err)
+ return err
+ }
+ }
+ return nil
+}
diff --git a/pkg/opasdk/opasdk_test.go b/pkg/opasdk/opasdk_test.go
index 80d1a1e..2517376 100644
--- a/pkg/opasdk/opasdk_test.go
+++ b/pkg/opasdk/opasdk_test.go
@@ -20,43 +20,43 @@
package opasdk
import (
+ "bou.ke/monkey"
+ "context"
"errors"
+ "fmt"
+ "github.com/open-policy-agent/opa/sdk"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
"io"
"os"
"policy-opa-pdp/consts"
- "testing"
"sync"
- "context"
- "fmt"
- "bou.ke/monkey"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/mock"
- "github.com/open-policy-agent/opa/sdk"
+ "testing"
)
// Mock for os.Open
type MockFile struct {
- mock.Mock
+ mock.Mock
}
func (m *MockFile) Open(name string) (*os.File, error) {
- args := m.Called(name)
- return args.Get(0).(*os.File), args.Error(1)
+ args := m.Called(name)
+ return args.Get(0).(*os.File), args.Error(1)
}
// Mock for io.ReadAll
func mockReadAll(r io.Reader) ([]byte, error) {
- return []byte(`{"config": "test"}`), nil
+ return []byte(`{"config": "test"}`), nil
}
type MockSDK struct {
- mock.Mock
+ mock.Mock
}
func (m *MockSDK) New(ctx context.Context, options sdk.Options) (*sdk.OPA, error) {
- fmt.Print("Inside New Method")
- args := m.Called(ctx, options)
- return args.Get(0).(*sdk.OPA), args.Error(1)
+ fmt.Print("Inside New Method")
+ args := m.Called(ctx, options)
+ return args.Get(0).(*sdk.OPA), args.Error(1)
}
func TestGetOPASingletonInstance_ConfigurationFileNotexisting(t *testing.T) {
@@ -89,20 +89,20 @@ func TestGetOPASingletonInstance_SingletonBehavior(t *testing.T) {
}
func TestGetOPASingletonInstance_ConfigurationFileLoaded(t *testing.T) {
- tmpFile, err := os.CreateTemp("", "config.json")
- if err != nil {
- t.Fatalf("Failed to create temp file: %v", err)
- }
- defer os.Remove(tmpFile.Name())
+ tmpFile, err := os.CreateTemp("", "config.json")
+ if err != nil {
+ t.Fatalf("Failed to create temp file: %v", err)
+ }
+ defer os.Remove(tmpFile.Name())
- consts.OpasdkConfigPath = tmpFile.Name()
+ consts.OpasdkConfigPath = tmpFile.Name()
- // Simulate OPA instance creation
- opaInstance, err := GetOPASingletonInstance()
+ // Simulate OPA instance creation
+ opaInstance, err := GetOPASingletonInstance()
- // Assertions
- assert.Nil(t, err)
- assert.NotNil(t, opaInstance)
+ // Assertions
+ assert.Nil(t, err)
+ assert.NotNil(t, opaInstance)
}
func TestGetOPASingletonInstance_OPAInstanceCreation(t *testing.T) {
@@ -123,79 +123,78 @@ func TestGetOPASingletonInstance_OPAInstanceCreation(t *testing.T) {
}
func TestGetOPASingletonInstance_JSONReadError(t *testing.T) {
- consts.OpasdkConfigPath = "/app/config/config.json"
+ consts.OpasdkConfigPath = "/app/config/config.json"
- // Simulate an error in JSON read (e.g., corrupt file)
- mockReadAll := func(r io.Reader) ([]byte, error) {
- return nil, errors.New("Failed to read JSON file")
- }
+ // Simulate an error in JSON read (e.g., corrupt file)
+ mockReadAll := func(r io.Reader) ([]byte, error) {
+ return nil, errors.New("Failed to read JSON file")
+ }
- jsonReader, err := getJSONReader(consts.OpasdkConfigPath, os.Open, mockReadAll)
- assert.NotNil(t, err)
- assert.Nil(t, jsonReader)
+ jsonReader, err := getJSONReader(consts.OpasdkConfigPath, os.Open, mockReadAll)
+ assert.NotNil(t, err)
+ assert.Nil(t, jsonReader)
}
func TestGetOPASingletonInstance_ValidConfigFile(t *testing.T) {
- tmpFile, err := os.CreateTemp("", "config.json")
- if err != nil {
- t.Fatalf("Failed to create temp file: %v", err)
- }
- defer os.Remove(tmpFile.Name())
+ tmpFile, err := os.CreateTemp("", "config.json")
+ if err != nil {
+ t.Fatalf("Failed to create temp file: %v", err)
+ }
+ defer os.Remove(tmpFile.Name())
- consts.OpasdkConfigPath = tmpFile.Name()
+ consts.OpasdkConfigPath = tmpFile.Name()
- // Valid JSON content
- validJSON := []byte(`{"config": "test"}`)
- err = os.WriteFile(tmpFile.Name(), validJSON, 0644)
- if err != nil {
- t.Fatalf("Failed to write valid JSON to temp file: %v", err)
- }
+ // Valid JSON content
+ validJSON := []byte(`{"config": "test"}`)
+ err = os.WriteFile(tmpFile.Name(), validJSON, 0644)
+ if err != nil {
+ t.Fatalf("Failed to write valid JSON to temp file: %v", err)
+ }
- // Call the function
- opaInstance, err := GetOPASingletonInstance()
+ // Call the function
+ opaInstance, err := GetOPASingletonInstance()
- assert.Nil(t, err)
- assert.NotNil(t, opaInstance)
+ assert.Nil(t, err)
+ assert.NotNil(t, opaInstance)
}
func TestGetJSONReader(t *testing.T) {
- // Create a mock file
- mockFile := new(MockFile)
- mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil)
+ // Create a mock file
+ mockFile := new(MockFile)
+ mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil)
- // Call the function with mock functions
- jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, mockReadAll)
+ // Call the function with mock functions
+ jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, mockReadAll)
- // Check the results
- assert.NoError(t, err)
- assert.NotNil(t, jsonReader)
+ // Check the results
+ assert.NoError(t, err)
+ assert.NotNil(t, jsonReader)
- // Check the content of the jsonReader
- expectedContent := `{"config": "test"}`
- actualContent := make([]byte, len(expectedContent))
- jsonReader.Read(actualContent)
- assert.Equal(t, expectedContent, string(actualContent))
+ // Check the content of the jsonReader
+ expectedContent := `{"config": "test"}`
+ actualContent := make([]byte, len(expectedContent))
+ jsonReader.Read(actualContent)
+ assert.Equal(t, expectedContent, string(actualContent))
- // Assert that the mock methods were called
- mockFile.AssertCalled(t, "Open", "/app/config/config.json")
+ // Assert that the mock methods were called
+ mockFile.AssertCalled(t, "Open", "/app/config/config.json")
}
func TestGetJSONReader_ReadAllError(t *testing.T) {
- mockFile := new(MockFile)
- mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil)
+ mockFile := new(MockFile)
+ mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil)
- // Simulate ReadAll error
- jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, func(r io.Reader) ([]byte, error) {
- return nil, io.ErrUnexpectedEOF
- })
+ // Simulate ReadAll error
+ jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, func(r io.Reader) ([]byte, error) {
+ return nil, io.ErrUnexpectedEOF
+ })
- assert.Error(t, err)
- assert.Nil(t, jsonReader)
+ assert.Error(t, err)
+ assert.Nil(t, jsonReader)
- mockFile.AssertCalled(t, "Open", "/app/config/config.json")
+ mockFile.AssertCalled(t, "Open", "/app/config/config.json")
}
-
func TestGetOPASingletonInstance(t *testing.T) {
// Call your function under test
opaInstance, err := GetOPASingletonInstance()
@@ -210,7 +209,6 @@ func TestGetOPASingletonInstance(t *testing.T) {
assert.NotNil(t, opaInstance, "OPA instance should be nil when sdk.New fails")
}
-
// Helper to reset the singleton for testing
func resetSingleton() {
opaInstance = nil
diff --git a/pkg/pdpattributes/pdpattributes.go b/pkg/pdpattributes/pdpattributes.go
index 8ce738b..005dd03 100644
--- a/pkg/pdpattributes/pdpattributes.go
+++ b/pkg/pdpattributes/pdpattributes.go
@@ -33,12 +33,12 @@ var (
)
func init() {
- PdpName = GenerateUniquePdpName()
+ PdpName = generateUniquePdpName()
log.Debugf("Name: %s", PdpName)
}
// Generates a unique PDP name by appending a randomly generated UUID
-func GenerateUniquePdpName() string {
+func generateUniquePdpName() string {
return "opa-" + uuid.New().String()
}
@@ -48,7 +48,7 @@ func SetPdpSubgroup(pdpsubgroup string) {
}
// Retrieves the current PDP subgroup value.
-func GetPdpSubgroup() string {
+func getPdpSubgroup() string {
return PdpSubgroup
}
@@ -58,7 +58,7 @@ func SetPdpHeartbeatInterval(pdpHeartbeatInterval int64) {
}
// Retrieves the current PDP heartbeat interval value.
-func GetPdpHeartbeatInterval() int64 {
+func getPdpHeartbeatInterval() int64 {
return PdpHeartbeatInterval
}
diff --git a/pkg/pdpattributes/pdpattributes_test.go b/pkg/pdpattributes/pdpattributes_test.go
index 909b4ee..c9a41c7 100644
--- a/pkg/pdpattributes/pdpattributes_test.go
+++ b/pkg/pdpattributes/pdpattributes_test.go
@@ -28,15 +28,15 @@ import (
func TestGenerateUniquePdpName_Success(t *testing.T) {
t.Run("GenerateValidPdpName", func(t *testing.T) {
- pdpName := GenerateUniquePdpName()
+ pdpName := generateUniquePdpName()
assert.Contains(t, pdpName, "opa-", "Expected PDP name to start with 'opa-'")
})
}
func TestGenerateUniquePdpName_Failure(t *testing.T) {
t.Run("UniqueNamesCheck", func(t *testing.T) {
- pdpName1 := GenerateUniquePdpName()
- pdpName2 := GenerateUniquePdpName()
+ pdpName1 := generateUniquePdpName()
+ pdpName2 := generateUniquePdpName()
assert.NotEqual(t, pdpName1, pdpName2, "Expected different UUID for each generated PDP name")
assert.Len(t, pdpName1, len("opa-")+36, "Expected length of PDP name to match 'opa-<UUID>' format")
})
@@ -46,14 +46,14 @@ func TestSetPdpSubgroup_Success(t *testing.T) {
t.Run("ValidSubgroup", func(t *testing.T) {
expectedSubgroup := "subgroup1"
SetPdpSubgroup(expectedSubgroup)
- assert.Equal(t, expectedSubgroup, GetPdpSubgroup(), "Expected PDP subgroup to match set value")
+ assert.Equal(t, expectedSubgroup, getPdpSubgroup(), "Expected PDP subgroup to match set value")
})
}
func TestSetPdpSubgroup_Failure(t *testing.T) {
t.Run("EmptySubgroup", func(t *testing.T) {
SetPdpSubgroup("")
- assert.Equal(t, "", GetPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string")
+ assert.Equal(t, "", getPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string")
})
t.Run("LargeSubgroup", func(t *testing.T) {
@@ -62,7 +62,7 @@ func TestSetPdpSubgroup_Failure(t *testing.T) {
largeSubgroup[i] = 'a'
}
SetPdpSubgroup(string(largeSubgroup))
- assert.Equal(t, string(largeSubgroup), GetPdpSubgroup(), "Expected large PDP subgroup to match set value")
+ assert.Equal(t, string(largeSubgroup), getPdpSubgroup(), "Expected large PDP subgroup to match set value")
})
}
@@ -70,19 +70,19 @@ func TestSetPdpHeartbeatInterval_Success(t *testing.T) {
t.Run("ValidHeartbeatInterval", func(t *testing.T) {
expectedInterval := int64(30)
SetPdpHeartbeatInterval(expectedInterval)
- assert.Equal(t, expectedInterval, GetPdpHeartbeatInterval(), "Expected heartbeat interval to match set value")
+ assert.Equal(t, expectedInterval, getPdpHeartbeatInterval(), "Expected heartbeat interval to match set value")
})
}
func TestSetPdpHeartbeatInterval_Failure(t *testing.T) {
t.Run("FailureHeartbeatInterval", func(t *testing.T) {
SetPdpHeartbeatInterval(-10)
- assert.Equal(t, int64(-10), GetPdpHeartbeatInterval(), "Expected heartbeat interval to handle negative values")
+ assert.Equal(t, int64(-10), getPdpHeartbeatInterval(), "Expected heartbeat interval to handle negative values")
})
t.Run("LargeHeartbeatInterval", func(t *testing.T) {
largeInterval := int64(time.Hour * 24 * 365 * 10) // 10 years in seconds
SetPdpHeartbeatInterval(largeInterval)
- assert.Equal(t, largeInterval, GetPdpHeartbeatInterval(), "Expected PDP heartbeat interval to handle large values")
+ assert.Equal(t, largeInterval, getPdpHeartbeatInterval(), "Expected PDP heartbeat interval to handle large values")
})
}
diff --git a/pkg/policymap/policy_and_data_map.go b/pkg/policymap/policy_and_data_map.go
new file mode 100644
index 0000000..5e06a59
--- /dev/null
+++ b/pkg/policymap/policy_and_data_map.go
@@ -0,0 +1,204 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2025: Deutsche Telekom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
+// ========================LICENSE_END===================================
+
+// will process the update message from pap and send the pdp status response.
+package policymap
+
+import (
+ "encoding/json"
+ "fmt"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/model"
+)
+
+var (
+ LastDeployedPolicies string
+)
+
+func formatPolicyAndDataMap(deployedPolicies []map[string]interface{}) (string, error) {
+
+ // Create the final JSON
+ finalMap := map[string]interface{}{
+ "deployed_policies_dict": deployedPolicies,
+ }
+
+ // Marshal the final map into JSON
+ policyDataJSON, err := FormatMapofAnyType(finalMap)
+ if err != nil {
+ return "", fmt.Errorf("failed to format json: %v", err)
+ }
+
+ // Update global state
+ LastDeployedPolicies = policyDataJSON
+ log.Infof("PoliciesDeployed Map: %v", LastDeployedPolicies)
+
+ return LastDeployedPolicies, nil
+}
+
+func FormatMapofAnyType[T any](mapOfAnyType T) (string, error) {
+ // Marshal the final map into JSON
+ jsonBytes, err := json.MarshalIndent(mapOfAnyType, "", " ")
+ if err != nil {
+ return "", fmt.Errorf("failed to format json: %v", err)
+ }
+
+ return string(jsonBytes), nil
+}
+
+func UnmarshalLastDeployedPolicies(lastdeployedPolicies string) ([]map[string]interface{}, error) {
+ if len(lastdeployedPolicies) == 0 {
+ return []map[string]interface{}{}, nil
+ }
+
+ var policiesMap struct {
+ DeployedPoliciesDict []map[string]interface{} `json:"deployed_policies_dict"`
+ }
+
+ err := json.Unmarshal([]byte(lastdeployedPolicies), &policiesMap)
+ if err != nil {
+ return nil, fmt.Errorf("failed to unmarshal LastDeployedPolicies: %v", err)
+ }
+
+ return policiesMap.DeployedPoliciesDict, nil
+}
+
+func UpdateDeployedPoliciesinMap(policy model.ToscaPolicy) (string, error) {
+
+ // Unmarshal the last known policies
+ deployedPolicies, err := UnmarshalLastDeployedPolicies(LastDeployedPolicies)
+ if err != nil {
+ log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+ }
+
+ dataKeys := make([]string, 0, len(policy.Properties.Data))
+ policyKeys := make([]string, 0, len(policy.Properties.Policy))
+
+ for key := range policy.Properties.Data {
+ dataKeys = append(dataKeys, key)
+ }
+
+ for key := range policy.Properties.Policy {
+ policyKeys = append(policyKeys, key)
+ }
+
+ directoryMap := map[string]interface{}{
+ "policy-id": policy.Metadata.PolicyID,
+ "policy-version": policy.Metadata.PolicyVersion,
+ "data": dataKeys,
+ "policy": policyKeys,
+ }
+ deployedPolicies = append(deployedPolicies, directoryMap)
+ return formatPolicyAndDataMap(deployedPolicies)
+}
+
+func RemoveUndeployedPoliciesfromMap(undeployedPolicy map[string]interface{}) (string, error) {
+
+ // Unmarshal the last known policies
+ deployedPolicies, err := UnmarshalLastDeployedPolicies(LastDeployedPolicies)
+ if err != nil {
+ log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+ }
+
+ remainingPolicies := []map[string]interface{}{}
+
+ for _, policy := range deployedPolicies {
+ shouldRetain := true
+ if policy["policy-id"] == undeployedPolicy["policy-id"] && policy["policy-version"] == undeployedPolicy["policy-version"] {
+ shouldRetain = false
+ }
+ if shouldRetain {
+ remainingPolicies = append(remainingPolicies, policy)
+ }
+ }
+
+ return formatPolicyAndDataMap(remainingPolicies)
+}
+
+func VerifyAndReturnPoliciesToBeDeployed(lastdeployedPoliciesMap string, pdpUpdate model.PdpUpdate) []model.ToscaPolicy {
+ type PoliciesMap struct {
+ DeployedPoliciesDict []map[string]interface{} `json:"deployed_policies_dict"`
+ }
+
+ var policiesMap PoliciesMap
+ err := json.Unmarshal([]byte(lastdeployedPoliciesMap), &policiesMap)
+ if err != nil {
+ log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+ return pdpUpdate.PoliciesToBeDeployed
+ }
+
+ deployedPolicies := policiesMap.DeployedPoliciesDict
+ var policiesToBeDeployed []model.ToscaPolicy
+
+ for _, deployingPolicy := range pdpUpdate.PoliciesToBeDeployed {
+ shouldDeploy := true
+ for _, deployedPolicy := range deployedPolicies {
+ if deployedPolicy["policy-id"] == deployingPolicy.Name && deployedPolicy["policy-version"] == deployingPolicy.Version {
+ log.Infof("Policy Previously deployed: %v %v, skipping", deployingPolicy.Name, deployingPolicy.Version)
+ shouldDeploy = false
+ break
+ }
+ }
+
+ if shouldDeploy {
+ log.Infof("Policy is new and should be deployed: %v %v", deployingPolicy.Name, deployingPolicy.Version)
+ policiesToBeDeployed = append(policiesToBeDeployed, deployingPolicy)
+ }
+ }
+
+ return policiesToBeDeployed
+
+}
+
+func ExtractDeployedPolicies(policiesMap string) []model.ToscaConceptIdentifier {
+
+ // Unmarshal the last known policies
+ deployedPolicies, err := UnmarshalLastDeployedPolicies(policiesMap)
+ if err != nil {
+ log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+ }
+
+ pdpstatus := model.PdpStatus{
+ Policies: []model.ToscaConceptIdentifier{},
+ }
+
+ for _, policy := range deployedPolicies {
+
+ // Extract policy-id and policy-version
+ policyID, idOk := policy["policy-id"].(string)
+ policyVersion, versionOk := policy["policy-version"].(string)
+ if !idOk || !versionOk {
+ log.Warnf("Missing or invalid policy-id or policy-version")
+ return nil
+ }
+ tosca := model.ToscaConceptIdentifier{
+ Name: policyID,
+ Version: policyVersion,
+ }
+ pdpstatus.Policies = append(pdpstatus.Policies, tosca)
+ }
+ return pdpstatus.Policies
+}
+
+func GetTotalDeployedPoliciesCountFromMap() int {
+ deployedPolicies, err := UnmarshalLastDeployedPolicies(LastDeployedPolicies)
+ if err != nil {
+ log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+ return 0
+ }
+ return len(deployedPolicies)
+}
diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go
index 9b405d5..313b9a6 100644
--- a/pkg/utils/utils.go
+++ b/pkg/utils/utils.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -21,7 +21,14 @@
package utils
import (
+ "fmt"
+ "github.com/go-playground/validator/v10"
"github.com/google/uuid"
+ "os"
+ "path/filepath"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/model"
+ "strings"
)
// validates if the given request is in valid uuid form
@@ -29,3 +36,181 @@ func IsValidUUID(u string) bool {
_, err := uuid.Parse(u)
return err == nil
}
+
+// Helper function to create a directory if it doesn't exist
+func CreateDirectory(dirPath string) error {
+ err := os.MkdirAll(dirPath, os.ModePerm)
+ if err != nil {
+ log.Errorf("Failed to create directory %s: %v", dirPath, err)
+ return err
+ }
+ log.Infof("Directory created: %s", dirPath)
+ return nil
+}
+
+// Helper function to check and remove a directory
+func RemoveDirectory(dirPath string) error {
+
+ entries, err := os.ReadDir(dirPath)
+ if err != nil {
+ if os.IsNotExist(err) {
+ log.Warnf("Directory does not exist: %s", dirPath)
+ // Directory does not exist, nothing to do
+ return nil
+ }
+ return fmt.Errorf("failed to read directory: %w", err)
+ }
+
+ for _, entry := range entries {
+ entryPath := filepath.Join(dirPath, entry.Name())
+
+ if entry.IsDir() {
+ // Check if the subdirectory is empty and delete it
+ isEmpty, err := isDirEmpty(entryPath)
+ if err != nil {
+ return err
+ }
+ if isEmpty {
+ log.Debugf("Removing empty subdirectory: %s", entryPath)
+ if err := os.RemoveAll(entryPath); err != nil {
+ return fmt.Errorf("failed to remove directory: %s, error: %w", entryPath, err)
+ }
+ }
+ } else {
+ // Delete specific files in the parent directory
+ if entry.Name() == "data.json" || entry.Name() == "policy.rego" {
+ log.Debugf("Removing file: %s", entryPath)
+ if err := os.Remove(entryPath); err != nil {
+ return fmt.Errorf("failed to remove file: %s, error: %w", entryPath, err)
+ }
+ }
+ }
+ }
+
+ return nil
+}
+
+// Helper function to check if a directory is empty
+func isDirEmpty(dirPath string) (bool, error) {
+ entries, err := os.ReadDir(dirPath)
+ if err != nil {
+ return false, fmt.Errorf("failed to read directory: %s, error: %w", dirPath, err)
+ }
+ return len(entries) == 0, nil
+}
+
+func ValidateFieldsStructs(pdpUpdate model.PdpUpdate) error {
+ //Initialize Validator and validate Struct after unmarshalling
+ validate := validator.New()
+
+ err := validate.Struct(pdpUpdate)
+ if err != nil {
+ for _, err := range err.(validator.ValidationErrors) {
+ log.Infof("Field %s failed on the %s tag\n", err.Field(), err.Tag())
+ }
+ return err
+ }
+ return err
+}
+
+// Validate validates the fields based on your requirements.
+func ValidateToscaPolicyJsonFields(policy model.ToscaPolicy) error {
+ // 1. Validate that Name, Version, and Metadata fields match.
+ emphasize := "Validation emphasizes the condition"
+ if policy.Name != policy.Metadata.PolicyID {
+ return fmt.Errorf("policy name '%s' does not match metadata policy-id '%s', '%s'", policy.Name, policy.Metadata.PolicyID, emphasize)
+ }
+ if policy.Version != policy.Metadata.PolicyVersion {
+ return fmt.Errorf("policy version '%s' does not match metadata policy-version '%s', '%s'", policy.Version, policy.Metadata.PolicyVersion, emphasize)
+ }
+
+ if policy.Properties.Data != nil {
+ // 2. Validate that Name is a suffix for keys in Properties.Data and Properties.Policy.
+ keySeen := make(map[string]bool)
+ for key := range policy.Properties.Data {
+ if keySeen[key] {
+ return fmt.Errorf("duplicate data key '%s' found, '%s'", key, emphasize)
+ }
+ keySeen[key] = true
+ if !strings.HasPrefix(key, "node." + policy.Name) {
+ return fmt.Errorf("data key '%s' does not have name '%s' as a prefix, '%s'", key, policy.Name, emphasize)
+ }
+ }
+ }
+ keySeen := make(map[string]bool)
+ for key := range policy.Properties.Policy {
+ if keySeen[key] {
+ return fmt.Errorf("duplicate policy key '%s' found, '%s'", key, emphasize)
+ }
+ keySeen[key] = true
+ if !strings.HasPrefix(key, policy.Name) {
+ return fmt.Errorf("policy key '%s' does not have name '%s' as a prefix, '%s'", key, policy.Name, emphasize)
+ }
+ }
+
+ return nil
+}
+
+func IsPolicyNameAllowed(policy model.ToscaPolicy, deployedPolicies []map[string]interface{}) (bool, error) {
+
+ policyID := policy.Name
+
+ if policyID == "" {
+ return false, fmt.Errorf("Policy Name cannot be Empty")
+ }
+
+ policyHierarchyLevel := strings.Split(policyID, ".")
+
+ for _, deployedPolicy := range deployedPolicies {
+ deployedPolicyID, ok := deployedPolicy["policy-id"].(string)
+ if !ok {
+ return false, fmt.Errorf("Invalid or missing policy-id field")
+ }
+
+ deployedPolicyIDHierarchyLevel := strings.Split(deployedPolicyID, ".")
+
+ if isParentOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel) {
+ return false, fmt.Errorf("Policy Validation Failed : Policy-id: %s is parent of deployed policy, overrides existing policy: %s", policyID, deployedPolicyID)
+ }
+
+ if isChildOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel) {
+ return false, fmt.Errorf("Policy Validation Failed: Policy-id: %s is child of deployed policy , can overwrite existing policy: %s", policyID, deployedPolicyID)
+
+ }
+
+ }
+
+ return true, nil
+}
+
+func isParentOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel []string) bool {
+
+ // new policy should have fewer levels than deployed policy to be a parent
+ if len(policyHierarchyLevel) < len(deployedPolicyIDHierarchyLevel) {
+ for policyNameIndex := range policyHierarchyLevel {
+ if policyHierarchyLevel[policyNameIndex] != deployedPolicyIDHierarchyLevel[policyNameIndex] {
+ return false
+ }
+ }
+ return true
+
+ }
+
+ return false
+}
+
+func isChildOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel []string) bool {
+
+ // new policy should have more levels than deployed policy to be a child
+ if len(policyHierarchyLevel) > len(deployedPolicyIDHierarchyLevel) {
+ for policyNameIndex := range deployedPolicyIDHierarchyLevel {
+ if deployedPolicyIDHierarchyLevel[policyNameIndex] != policyHierarchyLevel[policyNameIndex] {
+ return false
+ }
+ }
+ return true
+
+ }
+
+ return false
+}
diff --git a/test/README.md b/test/README.md
index 96f4eed..bc9f931 100644
--- a/test/README.md
+++ b/test/README.md
@@ -1,5 +1,19 @@
# Testing OPA
+## Curl URL For Deployment.
+
+1. `curl -u 'policyadmin:zb!XztG34' -X POST -H "Content-Type":"application/yaml" --data-binary @test_resources/policy_deploy_single_policy.yaml http://localhost:30002/policy/api/v1/policytypes/onap.policies.native.opa/versions/1.0.0/policies`
+
+2. `curl -u 'policyadmin:zb!XztG34' -X POST -H "Content-Type":"application/json" -d @test_resources/deploy.json http://localhost:30003/policy/pap/v1/pdps/policies`
+
+## Curl URL For Undeployment
+
+`curl -u 'policyadmin:zb!XztG34' -X DELETE http://localhost:30003/policy/pap/v1/pdps/policies/role/versions/1.0.0` , where role is the policy name.
+
+## Curl URL for Batch Undeployment.
+
+`curl -v -u 'policyadmin:zb!XztG34' -X POST -H "Content-Type":"application/json" -d @test_resources/undeploy_batch_delete.json http://localhost:30003/policy/pap/v1/pdps/deployments/batch`
+
## Verification API Calls
curl -u 'policyadmin:zb!XztG34' -H 'Content-Type: application/json' -H 'Accept: application/json' -d '{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "11:34:56", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z", "policyFilter" : [""], "policyName":"example","input":{"method":"POST","path":["users"]}}' -X POST http://0.0.0.0:8282/policy/pdpo/v1/decision
diff --git a/test/docker-compose.yml b/test/docker-compose.yml
index 273fd1d..a4dea24 100644
--- a/test/docker-compose.yml
+++ b/test/docker-compose.yml
@@ -97,13 +97,6 @@ services:
- ./scripts.sh:/app/scripts.sh
- ./Opagroup.json:/app/Opagroup.json
- ./policy-new.yaml:/app/policy-new.yaml
- - type: bind
- source: ./policies
- target: /opt/policies
- - type: bind
- source: ./data
- target: /opt/data
-
environment:
LOG_LEVEL: debug
KAFKA_URL: "kafka:9092"
diff --git a/test/test_resources/deploy.json b/test/test_resources/deploy.json
new file mode 100644
index 0000000..2698286
--- /dev/null
+++ b/test/test_resources/deploy.json
@@ -0,0 +1,10 @@
+{
+ "policies": [
+ {
+ "policy-id": "role",
+ "policy-version": "1.0.0"
+
+ }
+ ]
+}
+
diff --git a/test/test_resources/deploy_collab.json b/test/test_resources/deploy_collab.json
new file mode 100644
index 0000000..06d43c9
--- /dev/null
+++ b/test/test_resources/deploy_collab.json
@@ -0,0 +1,10 @@
+{
+ "policies": [
+ {
+ "policy-id": "collab",
+ "policy-version": "1.0.0"
+
+ }
+ ]
+}
+
diff --git a/test/test_resources/deploy_conflict.json b/test/test_resources/deploy_conflict.json
new file mode 100644
index 0000000..72d2fcd
--- /dev/null
+++ b/test/test_resources/deploy_conflict.json
@@ -0,0 +1,10 @@
+{
+ "policies": [
+ {
+ "policy-id": "conflict",
+ "policy-version": "1.0.0"
+
+ }
+ ]
+}
+
diff --git a/test/test_resources/deploy_zone.json b/test/test_resources/deploy_zone.json
new file mode 100644
index 0000000..217457b
--- /dev/null
+++ b/test/test_resources/deploy_zone.json
@@ -0,0 +1,11 @@
+
+{
+ "policies": [
+ {
+ "policy-id": "zone",
+ "policy-version": "1.0.0"
+
+ }
+ ]
+}
+
diff --git a/test/test_resources/policy_collab.yaml b/test/test_resources/policy_collab.yaml
new file mode 100644
index 0000000..e56bbed
--- /dev/null
+++ b/test/test_resources/policy_collab.yaml
@@ -0,0 +1,18 @@
+tosca_definitions_version: tosca_simple_yaml_1_1_0
+topology_template:
+ policies:
+ - collab:
+ type: onap.policies.native.opa
+ type_version: 1.0.0
+ properties:
+ data:
+ collab.action: ewogICAgInVzZXJfcm9sZXMiOiB7CiAgICAgICAgImFsaWNlIjogWwogICAgICAgICAgICAiYWRtaW4iCiAgICAgICAgXSwKICAgICAgICAiYm9iIjogWwogICAgICAgICAgICAiZWRpdG9yIgogICAgICAgIF0sCiAgICAgICAgImNoYXJsaWUiOiBbCiAgICAgICAgICAgICJ2aWV3ZXIiCiAgICAgICAgXQogICAgfSwKICAgICJyb2xlX3Blcm1pc3Npb25zIjogewogICAgICAgICJhZG1pbiI6IHsKICAgICAgICAgICAgImFjdGlvbnMiOiBbCiAgICAgICAgICAgICAgICAicmVhZCIsCiAgICAgICAgICAgICAgICAid3JpdGUiLAogICAgICAgICAgICAgICAgImRlbGV0ZSIKICAgICAgICAgICAgXSwKICAgICAgICAgICAgInJlc291cmNlcyI6IFsKICAgICAgICAgICAgICAgICJzZXJ2ZXIiLAogICAgICAgICAgICAgICAgImRhdGFiYXNlIgogICAgICAgICAgICBdCiAgICAgICAgfSwKICAgICAgICAiZWRpdG9yIjogewogICAgICAgICAgICAiYWN0aW9ucyI6IFsKICAgICAgICAgICAgICAgICJyZWFkIiwKICAgICAgICAgICAgICAgICJ3cml0ZSIKICAgICAgICAgICAgXSwKICAgICAgICAgICAgInJlc291cmNlcyI6IFsKICAgICAgICAgICAgICAgICJzZXJ2ZXIiCiAgICAgICAgICAgIF0KICAgICAgICB9LAogICAgICAgICJ2aWV3ZXIiOiB7CiAgICAgICAgICAgICJhY3Rpb25zIjogWwogICAgICAgICAgICAgICAgInJlYWQiCiAgICAgICAgICAgIF0sCiAgICAgICAgICAgICJyZXNvdXJjZXMiOiBbCiAgICAgICAgICAgICAgICAic2VydmVyIgogICAgICAgICAgICBdCiAgICAgICAgfQogICAgfQp9
+ policy:
+ collab.conflict: cGFja2FnZSBjb2xsYWIuY29uZmxpY3QKCmltcG9ydCByZWdvLnYxCgphbGxvdyBpZiB7IGlucHV0Lm5hbWUgPT0gImFsaWNlIiB9CmRlbnkgaWYgeyBpbnB1dC5uYW1lID09ICJhbGljZSIgfQoKIyBkZW55IGV2ZXJ5dGhpbmcgYnkgZGVmYXVsdApkZWZhdWx0IGF1dGh6IDo9IGZhbHNlCgojIGRlbnkgb3ZlcnJpZGVzIGFsbG93CmF1dGh6IGlmIHsKICAgIGFsbG93CiAgICBub3QgZGVueQp9Cg==
+ collab.action: cGFja2FnZSBjb2xsYWIuYWN0aW9uCgppbXBvcnQgcmVnby52MQoKIyBCeSBkZWZhdWx0LCBkZW55IHJlcXVlc3RzLgpkZWZhdWx0IGFsbG93IDo9IGZhbHNlCgoKIyBBbGxvdyB0aGUgYWN0aW9uIGlmIGFkbWluIHJvbGUgaXMgZ3JhbnRlZCBwZXJtaXNzaW9uIHRvIHBlcmZvcm0gdGhlIGFjdGlvbi4KYWxsb3cgaWYgewogICAgc29tZSBpCiAgICBkYXRhLmNvbGxhYi5hY3Rpb24udXNlcl9yb2xlc1tpbnB1dC51c2VyXVtpXSA9PSByb2xlCiAgICBzb21lIGoKICAgIGRhdGEuY29sbGFiLmFjdGlvbi5yb2xlX3Blcm1pc3Npb25zW3JvbGVdLmFjdGlvbnNbal0gPT0gaW5wdXQuYWN0aW9uCiAgICBzb21lIGsKICAgIGRhdGEuY29sbGFiLmFjdGlvbi5yb2xlX3Blcm1pc3Npb25zW3JvbGVdLnJlc291cmNlc1trXSA9PSBpbnB1dC50eXBlCn0KIyAgICAgICAqIFJlZ28gY29tcGFyaXNvbiB0byBvdGhlciBzeXN0ZW1zOiBodHRwczovL3d3dy5vcGVucG9saWN5YWdlbnQub3JnL2RvY3MvbGF0ZXN0L2NvbXBhcmlzb24tdG8tb3RoZXItc3lzdGVtcy8KIyAgICAgICAqIFJlZ28gSXRlcmF0aW9uOiBodHRwczovL3d3dy5vcGVucG9saWN5YWdlbnQub3JnL2RvY3MvbGF0ZXN0LyNpdGVyYXRpb24KCg==
+ collab: cGFja2FnZSBjb2xsYWIKCmltcG9ydCByZWdvLnYxCmltcG9ydCBkYXRhLmNvbGxhYi5jb25mbGljdAppbXBvcnQgZGF0YS5jb2xsYWIuYWN0aW9uCgpkZWZhdWx0IGFsbG93IDo9IGZhbHNlCmFsbG93IGlmIHsKICAgIGNvbmZsaWN0LmFsbG93CiAgICBhY3Rpb24uYWxsb3cKfQ==
+ name: collab
+ version: 1.0.0
+ metadata:
+ policy-id: collab
+ policy-version: 1.0.0
diff --git a/test/test_resources/policy_conflict.yaml b/test/test_resources/policy_conflict.yaml
new file mode 100644
index 0000000..0406179
--- /dev/null
+++ b/test/test_resources/policy_conflict.yaml
@@ -0,0 +1,15 @@
+tosca_definitions_version: tosca_simple_yaml_1_1_0
+topology_template:
+ policies:
+ - conflict:
+ type: onap.policies.native.opa
+ type_version: 1.0.0
+ properties:
+ data:
+ policy:
+ conflict: cGFja2FnZSBjb25mbGljdAoKaW1wb3J0IHJlZ28udjEKCmFsbG93IGlmIHsgaW5wdXQubmFtZSA9PSAiYWxpY2UiIH0KZGVueSBpZiB7IGlucHV0Lm5hbWUgPT0gImFsaWNlIiB9CgojIGRlbnkgZXZlcnl0aGluZyBieSBkZWZhdWx0CmRlZmF1bHQgYXV0aHogOj0gZmFsc2UKCiMgZGVueSBvdmVycmlkZXMgYWxsb3cKYXV0aHogaWYgewogICAgYWxsb3cKICAgIG5vdCBkZW55Cn0KCg==
+ name: conflict
+ version: 1.0.0
+ metadata:
+ policy-id: conflict
+ policy-version: 1.0.0
diff --git a/test/test_resources/policy_deploy_single_policy.yaml b/test/test_resources/policy_deploy_single_policy.yaml
new file mode 100644
index 0000000..872b6c6
--- /dev/null
+++ b/test/test_resources/policy_deploy_single_policy.yaml
@@ -0,0 +1,17 @@
+tosca_definitions_version: tosca_simple_yaml_1_1_0
+topology_template:
+ policies:
+ - role:
+ type: onap.policies.native.opa
+ type_version: 1.0.0
+ properties:
+ data:
+ role: ewogICAgInVzZXJfcm9sZXMiOiB7CiAgICAgICAgImFsaWNlIjogWwogICAgICAgICAgICAiYWRtaW4iCiAgICAgICAgXSwKICAgICAgICAiYm9iIjogWwogICAgICAgICAgICAiZW1wbG95ZWUiLAogICAgICAgICAgICAiYmlsbGluZyIKICAgICAgICBdLAogICAgICAgICJldmUiOiBbCiAgICAgICAgICAgICJjdXN0b21lciIKICAgICAgICBdCiAgICB9LAogICAgInJvbGVfZ3JhbnRzIjogewogICAgICAgICJjdXN0b21lciI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImNhdCIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJhZG9wdCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAiYWRvcHQiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiZW1wbG95ZWUiOiBbCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJjYXQiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJ1cGRhdGUiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiYmlsbGluZyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0KICAgICAgICBdCiAgICB9Cn0K
+ policy:
+ role: cGFja2FnZSByb2xlCgppbXBvcnQgcmVnby52MQoKIyBCeSBkZWZhdWx0LCBkZW55IHJlcXVlc3RzLgpkZWZhdWx0IGFsbG93IDo9IGZhbHNlCgojIEFsbG93IGFkbWlucyB0byBkbyBhbnl0aGluZy4KYWxsb3cgaWYgdXNlcl9pc19hZG1pbgoKIyBBbGxvdyB0aGUgYWN0aW9uIGlmIHRoZSB1c2VyIGlzIGdyYW50ZWQgcGVybWlzc2lvbiB0byBwZXJmb3JtIHRoZSBhY3Rpb24uCmFsbG93IGlmIHsKICAgICAgICAjIEZpbmQgZ3JhbnRzIGZvciB0aGUgdXNlci4KICAgICAgICBzb21lIGdyYW50IGluIHVzZXJfaXNfZ3JhbnRlZAoKICAgICAgICAjIENoZWNrIGlmIHRoZSBncmFudCBwZXJtaXRzIHRoZSBhY3Rpb24uCiAgICAgICAgaW5wdXQuYWN0aW9uID09IGdyYW50LmFjdGlvbgogICAgICAgIGlucHV0LnR5cGUgPT0gZ3JhbnQudHlwZQp9CgojIHVzZXJfaXNfYWRtaW4gaXMgdHJ1ZSBpZiAiYWRtaW4iIGlzIGFtb25nIHRoZSB1c2VyJ3Mgcm9sZXMgYXMgcGVyIGRhdGEudXNlcl9yb2xlcwp1c2VyX2lzX2FkbWluIGlmICJhZG1pbiIgaW4gZGF0YS5yb2xlLnVzZXJfcm9sZXNbaW5wdXQudXNlcl0KCiMgdXNlcl9pc19ncmFudGVkIGlzIGEgc2V0IG9mIGdyYW50cyBmb3IgdGhlIHVzZXIgaWRlbnRpZmllZCBpbiB0aGUgcmVxdWVzdC4KIyBUaGUgYGdyYW50YCB3aWxsIGJlIGNvbnRhaW5lZCBpZiB0aGUgc2V0IGB1c2VyX2lzX2dyYW50ZWRgIGZvciBldmVyeS4uLgp1c2VyX2lzX2dyYW50ZWQgY29udGFpbnMgZ3JhbnQgaWYgewogICAgICAgICMgYHJvbGVgIGFzc2lnbmVkIGFuIGVsZW1lbnQgb2YgdGhlIHVzZXJfcm9sZXMgZm9yIHRoaXMgdXNlci4uLgogICAgICAgIHNvbWUgcm9sZSBpbiBkYXRhLnJvbGUudXNlcl9yb2xlc1tpbnB1dC51c2VyXQoKICAgICAgICAjIGBncmFudGAgYXNzaWduZWQgYSBzaW5nbGUgZ3JhbnQgZnJvbSB0aGUgZ3JhbnRzIGxpc3QgZm9yICdyb2xlJy4uLgogICAgICAgIHNvbWUgZ3JhbnQgaW4gZGF0YS5yb2xlLnJvbGVfZ3JhbnRzW3JvbGVdCn0KCiMgICAgICAgKiBSZWdvIGNvbXBhcmlzb24gdG8gb3RoZXIgc3lzdGVtczogaHR0cHM6Ly93d3cub3BlbnBvbGljeWFnZW50Lm9yZy9kb2NzL2xhdGVzdC9jb21wYXJpc29uLXRvLW90aGVyLXN5c3RlbXMvCiMgICAgICAgKiBSZWdvIEl0ZXJhdGlvbjogaHR0cHM6Ly93d3cub3BlbnBvbGljeWFnZW50Lm9yZy9kb2NzL2xhdGVzdC8jaXRlcmF0aW9uCgo=
+ name: role
+ version: 1.0.0
+ metadata:
+ policy-id: role
+ policy-version: 1.0.0
+
diff --git a/test/test_resources/policy_zone.yaml b/test/test_resources/policy_zone.yaml
new file mode 100644
index 0000000..4f3ade1
--- /dev/null
+++ b/test/test_resources/policy_zone.yaml
@@ -0,0 +1,16 @@
+tosca_definitions_version: tosca_simple_yaml_1_1_0
+topology_template:
+ policies:
+ - zone:
+ type: onap.policies.native.opa
+ type_version: 1.0.0
+ properties:
+ data:
+ zone: ewogICJ6b25lIjogewogICAgInpvbmVfYWNjZXNzX2xvZ3MiOiBbCiAgICAgIHsgImxvZ19pZCI6ICJsb2cxIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDA5OjAwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJncmFudGVkIiwgInVzZXIiOiAidXNlcjEiIH0sCiAgICAgIHsgImxvZ19pZCI6ICJsb2cyIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDEwOjMwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJkZW5pZWQiLCAidXNlciI6ICJ1c2VyMiIgfSwKICAgICAgeyAibG9nX2lkIjogImxvZzMiLCAidGltZXN0YW1wIjogIjIwMjQtMTEtMDFUMTE6MDA6MDBaIiwgInpvbmVfaWQiOiAiem9uZUIiLCAiYWNjZXNzIjogImdyYW50ZWQiLCAidXNlciI6ICJ1c2VyMyIgfQogICAgXQogIH0KfQo=
+ policy:
+ zone : cGFja2FnZSB6b25lCgppbXBvcnQgcmVnby52MQoKZGVmYXVsdCBhbGxvdyA6PSBmYWxzZQoKYWxsb3cgaWYgewogICAgaGFzX3pvbmVfYWNjZXNzCiAgICBhY3Rpb25faXNfbG9nX3ZpZXcKfQoKYWN0aW9uX2lzX2xvZ192aWV3IGlmIHsKICAgICJ2aWV3IiBpbiBpbnB1dC5hY3Rpb25zCn0KCmhhc196b25lX2FjY2VzcyBjb250YWlucyBhY2Nlc3NfZGF0YSBpZiB7CiAgICBzb21lIHpvbmVfZGF0YSBpbiBkYXRhLnpvbmUuem9uZS56b25lX2FjY2Vzc19sb2dzCiAgICB6b25lX2RhdGEudGltZXN0YW1wID49IGlucHV0LnRpbWVfcGVyaW9kLmZyb20KICAgIHpvbmVfZGF0YS50aW1lc3RhbXAgPCBpbnB1dC50aW1lX3BlcmlvZC50bwogICAgem9uZV9kYXRhLnpvbmVfaWQgPT0gaW5wdXQuem9uZV9pZAogICAgYWNjZXNzX2RhdGEgOj0ge2RhdGF0eXBlOiB6b25lX2RhdGFbZGF0YXR5cGVdIHwgZGF0YXR5cGUgaW4gaW5wdXQuZGF0YXR5cGVzfQp9Cg==
+ name: zone
+ version: 1.0.0
+ metadata:
+ policy-id: zone
+ policy-version: 1.0.0
diff --git a/test/test_resources/undeploy_batch_delete.json b/test/test_resources/undeploy_batch_delete.json
new file mode 100644
index 0000000..9195f74
--- /dev/null
+++ b/test/test_resources/undeploy_batch_delete.json
@@ -0,0 +1,23 @@
+{
+ "groups": [
+ {
+ "name": "opaGroup",
+ "deploymentSubgroups": [
+ {
+ "pdpType": "opa",
+ "action": "DELETE",
+ "policies": [
+ {
+ "name": "account",
+ "version": "1.0.0"
+ },
+ {
+ "name": "organization",
+ "version": "1.0.0"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}