From 358eb80b992050e3749834b6211edb6426325020 Mon Sep 17 00:00:00 2001
From: Murali Parthasarathy K <>
Date: Wed, 12 Feb 2025 12:35:10 +0100
Subject: Design and Develop run time policy updates to OPA-PDP

Issue-ID: POLICY-5216
Change-Id: I2a1466f74106bbab7869dd82f29badd157b980bb
Signed-off-by: Murali Parthasarathy K <>
 Dockerfile                                         |   6 +-                                          |  30 +-
 api/register-handlers.go                           |   6 +-
 build/Dockerfile                                   |  77 ---
 cmd/opa-pdp/opa-pdp.go                             |  10 +-
 cmd/opa-pdp/opa-pdp_test.go                        | 599 ++++++++++-----------
 pkg/bundleserver/bundle-server.go                  |   8 +-
 pkg/bundleserver/bundle-server_test.go             |  10 +-
 pkg/decision/decision-provider.go                  | 156 ++++--
 pkg/decision/decision-provider_test.go             | 154 +++++-
 pkg/kafkacomm/handler/pdp_message_handler.go       |   6 +-
 pkg/kafkacomm/handler/pdp_state_change_handler.go  |   4 +-
 .../handler/pdp_state_change_handler_test.go       |   2 +-
 pkg/kafkacomm/handler/pdp_update_deploy_policy.go  | 390 ++++++++++++++
 .../handler/pdp_update_message_handler.go          | 143 ++++-
 .../handler/pdp_update_message_handler_test.go     |  74 ++-
 .../handler/pdp_update_undeploy_policy.go          | 196 +++++++
 pkg/kafkacomm/pdp_topic_consumer.go                |   5 +-
 pkg/kafkacomm/pdp_topic_consumer_test.go           |  54 +-
 pkg/kafkacomm/publisher/pdp-heartbeat.go           |  24 +-
 pkg/kafkacomm/publisher/pdp-heartbeat_test.go      |   3 +-
 pkg/kafkacomm/publisher/pdp-pap-registration.go    |   1 -
 .../publisher/pdp-pap-registration_test.go         |   7 +-
 pkg/kafkacomm/publisher/pdp-status-publisher.go    |  66 ++-
 .../publisher/pdp-status-publisher_test.go         |   4 +-
 pkg/metrics/counters.go                            |  94 +++-
 pkg/metrics/counters_test.go                       |   4 +-
 pkg/metrics/statistics-provider.go                 |  16 +-
 pkg/model/mesages.go                               |   8 +-
 pkg/model/messages_test.go                         |  50 +-
 pkg/model/toscaconceptidentifier.go                |   4 +-
 pkg/model/toscapolicy.go                           |  43 ++
 pkg/opasdk/opasdk.go                               | 160 +++++-
 pkg/opasdk/opasdk_test.go                          | 150 +++---
 pkg/pdpattributes/pdpattributes.go                 |   8 +-
 pkg/pdpattributes/pdpattributes_test.go            |  18 +-
 pkg/policymap/policy_and_data_map.go               | 204 +++++++
 pkg/utils/utils.go                                 | 187 ++++++-
 test/                                     |  14 +
 test/docker-compose.yml                            |   7 -
 test/test_resources/deploy.json                    |  10 +
 test/test_resources/deploy_collab.json             |  10 +
 test/test_resources/deploy_conflict.json           |  10 +
 test/test_resources/deploy_zone.json               |  11 +
 test/test_resources/policy_collab.yaml             |  18 +
 test/test_resources/policy_conflict.yaml           |  15 +
 .../policy_deploy_single_policy.yaml               |  17 +
 test/test_resources/policy_zone.yaml               |  16 +
 test/test_resources/undeploy_batch_delete.json     |  23 +
 49 files changed, 2460 insertions(+), 672 deletions(-)
 delete mode 100644 build/Dockerfile
 create mode 100644 pkg/kafkacomm/handler/pdp_update_deploy_policy.go
 create mode 100644 pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
 create mode 100644 pkg/model/toscapolicy.go
 create mode 100644 pkg/policymap/policy_and_data_map.go
 create mode 100644 test/test_resources/deploy.json
 create mode 100644 test/test_resources/deploy_collab.json
 create mode 100644 test/test_resources/deploy_conflict.json
 create mode 100644 test/test_resources/deploy_zone.json
 create mode 100644 test/test_resources/policy_collab.yaml
 create mode 100644 test/test_resources/policy_conflict.yaml
 create mode 100644 test/test_resources/policy_deploy_single_policy.yaml
 create mode 100644 test/test_resources/policy_zone.yaml
 create mode 100644 test/test_resources/undeploy_batch_delete.json

diff --git a/Dockerfile b/Dockerfile
index ecd5c49..5312d06 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,6 +1,6 @@
 # -
 #   ========================LICENSE_START=================================
-#   Copyright (C) 2024: Deutsche Telekom
+#   Copyright (C) 2024-2025: Deutsche Telekom
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -64,6 +64,10 @@ RUN mkdir /app
 COPY --from=compile /app /app
 RUN chmod +x /app/opa-pdp
+RUN mkdir /opt/policies
+RUN mkdir /opt/data
 # Copy our opa executable from build stage
 COPY --from=build /tmp/opa /app/opa
 RUN chmod 755 /app/opa
diff --git a/ b/
index 1254546..f9c7a2c 100644
--- a/
+++ b/
@@ -8,13 +8,13 @@ docker build -f  ./build/Dockerfile  -t opa-pdp:1.0.0 .
 1. docker image ls | grep opa-pdp
 2. inside test directory run - docker-compose down
 3.  docker-compose up -d
 4.  docker logs -f opa-pdp
 ## Generating models with openapi.yaml
 1. oapi-codegen -package=oapicodegen  -generate "models" openapi.yaml > models.go
 ## Creating new Policy
@@ -23,13 +23,13 @@ docker build -f  ./build/Dockerfile  -t opa-pdp:1.0.0 .
 2. Inside this directory create a policy [i.e; rego file] named policy.rego. Version 1 i.e v1 is supported  for rego files.
-3. For contents you can see example of  policy.rego under test/policies/role/policy.rego. 
+3. For contents you can see example of  policy.rego under test/policies/role/policy.rego.
 3. Inside test/policies/data create a new directory with the package name of policy.rego. For example test/policies/data/role
 4. Create a file data.json under the newly created directory inside data. For example test/policies/data/data.json
-5. In policy.rego the package declaration organizes the policy rules. This allows 
+5. In policy.rego the package declaration organizes the policy rules. This allows
 6. The Rule allow evaluates to true/false based on the logic defined in policy.rego
@@ -39,10 +39,28 @@ docker build -f  ./build/Dockerfile  -t opa-pdp:1.0.0 .
 9. To deploy a new policy opa-pdp need to be redpolyed i.e; docker-compose down and up need to be executed.
+## Deploying New Policy
+1. Create a tosca policy file that has policy.rego and data.json encoded contents.
+2. For example refer to test/policy_deployment.yaml.
+3. OPA emphasizes that each policy should have a unique policy-name/policy-id,
+   example:
+   Not Allowed --> when policy with name is deployed and when  not allowed for deployment since it carries the same hierarchy.
+   Allowed --> Policy with name is deployed and when is allowed for deployment since it does not have the same hierarchy.
+4. Policy and data key should start (prefixed) with policy-id. For ex refer totest/testresources/policy_deploy_single_policy.yaml.
+5. Create a deploy.json file to deploy through pap. Refer to file under test/testresources/deploy.json.
 ## Testing Decision Api
-send json 
-{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC",  "timeOffset": "+05:30", "currentDateTime": "2024-11-22 12:08:00.123456+0000 ", "policyName":"role/allow","input":{"user":"alice","action":"write","object":"id123","type":"dog"}} 
+send json
+{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC",  "timeOffset": "+05:30", "currentDateTime": "2024-11-22 12:08:00.123456+0000 ", "policyName":"role/allow","input":{"user":"alice","action":"write","object":"id123","type":"dog"}}
 to opa-pdp as shown in curl commands below.
 "policyName":"[packagename in rego file]/allow"
diff --git a/api/register-handlers.go b/api/register-handlers.go
index c5eb5df..0504e48 100644
--- a/api/register-handlers.go
+++ b/api/register-handlers.go
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@ import (
+	"policy-opa-pdp/pkg/opasdk"
 // RegisterHandlers registers the HTTP handlers for the service.
@@ -53,6 +54,9 @@ func RegisterHandlers() {
 	statisticsReportHandler := http.HandlerFunc(metrics.FetchCurrentStatistics)
 	http.HandleFunc("/policy/pdpo/v1/statistics", basicAuth(statisticsReportHandler))
+	listPoliciesHandler := http.HandlerFunc(opasdk.ListPolicies)
+	http.Handle("/opa/listpolicies", listPoliciesHandler)
 // handles authentication
diff --git a/build/Dockerfile b/build/Dockerfile
deleted file mode 100644
index 84359c3..0000000
--- a/build/Dockerfile
+++ /dev/null
@@ -1,77 +0,0 @@
-# -
-#   ========================LICENSE_START=================================
-#   Copyright (C) 2024: Deutsche Telekom
-#   Licensed under the Apache License, Version 2.0 (the "License");
-#   you may not use this file except in compliance with the License.
-#   You may obtain a copy of the License at
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-#   SPDX-License-Identifier: Apache-2.0
-#   ========================LICENSE_END===================================
-FROM curlimages/curl:7.78.0 AS build
-# Get OPA
-RUN curl -Lo /tmp/opa
-FROM golang:1.23 AS compile
-RUN mkdir /app
-COPY ../go.mod ../go.sum /app/
-COPY . .
-RUN mkdir /app/cfg
-ADD ../cfg /app/cfg
-RUN mkdir /app/consts
-ADD ../consts /app/consts
-RUN mkdir /app/api
-ADD ../api /app/api
-RUN mkdir /app/cmd
-ADD ../cmd /app/cmd
-RUN mkdir /app/pkg
-ADD ../pkg /app/pkg
-RUN mkdir /app/bundles
-# Build the binary
-RUN GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o /app/opa-pdp /app/cmd/opa-pdp/opa-pdp.go
-#COPY config.json /app/config.json
-#RUN chmod 644 /app/config.json
-FROM ubuntu
-RUN apt-get update && apt-get install -y netcat-openbsd && rm -rf /var/lib/apt/lists/*
-RUN apt-get update && apt-get install -y curl
-# Copy our static executable from compile stage
-RUN mkdir /app
-COPY --from=compile /app /app
-RUN chmod +x /app/opa-pdp
-# Copy our opa executable from build stage
-COPY --from=build /tmp/opa /app/opa
-RUN chmod 755 /app/opa
-EXPOSE 8282
-# Command to run OPA with the policies
-CMD ["/app/opa-pdp"]
diff --git a/cmd/opa-pdp/opa-pdp.go b/cmd/opa-pdp/opa-pdp.go
index a2cbde2..ccc9f5d 100644
--- a/cmd/opa-pdp/opa-pdp.go
+++ b/cmd/opa-pdp/opa-pdp.go
@@ -68,7 +68,8 @@ func main() {
 	// Initialize Handlers and Build Bundle
-	if err := initializeBundleFunc(exec.Command); err != nil {
+	if output, err := initializeBundleFunc(exec.Command); err != nil {
+		log.Warnf("Output %s", string(output))
 		log.Warnf("Failed to initialize bundle: %s", err)
@@ -95,11 +96,11 @@ func main() {
 	defer producer.Close()
 	sender := &publisher.RealPdpStatusSender{Producer: producer}
 	// start pdp message handler in a seperate routine
 	handleMessagesFunc(ctx, kc, sender)
 	time.Sleep(10 * time.Second)
 	// pdp registration
 	isRegistered := registerPDPFunc(sender)
 	if !isRegistered {
@@ -141,7 +142,7 @@ func initializeHandlers() {
 // build bundle tar file
-func initializeBundle(execCmd func(string, ...string) *exec.Cmd) error {
+func initializeBundle(execCmd func(string, ...string) *exec.Cmd) (string, error) {
 	return bundleserver.BuildBundle(execCmd)
@@ -228,3 +229,4 @@ myLoop:
 	time.Sleep(time.Duration(consts.SHUTDOWN_WAIT_TIME) * time.Second)
diff --git a/cmd/opa-pdp/opa-pdp_test.go b/cmd/opa-pdp/opa-pdp_test.go
index 21d9154..87fb2de 100644
--- a/cmd/opa-pdp/opa-pdp_test.go
+++ b/cmd/opa-pdp/opa-pdp_test.go
@@ -21,26 +21,26 @@ package main
 import (
+	"errors"
+	"fmt"
+	"policy-opa-pdp/pkg/kafkacomm/handler"
-	"policy-opa-pdp/pkg/kafkacomm/handler"
-	"fmt"
+	"reflect"
-	"errors"
-	"reflect"
-        ""
+	""
+	""
-	""
 // Mock objects and functions
@@ -57,8 +57,8 @@ func (m *MockKafkaConsumerInterface) Close() {
 func (m *MockKafkaConsumerInterface) ReadMessage(kc *kafkacomm.KafkaConsumer) ([]byte, error) {
-    args := m.Called(kc)
-    return args.Get(0).([]byte), args.Error(0)
+	args := m.Called(kc)
+	return args.Get(0).([]byte), args.Error(0)
 type MockPdpStatusSender struct {
@@ -71,11 +71,10 @@ func (m *MockPdpStatusSender) SendRegistration() error {
 func (m *MockPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error {
- args := m.Called(pdpStatus)
- return args.Error(0)
+	args := m.Called(pdpStatus)
+	return args.Error(0)
 type MockServer struct {
@@ -129,8 +128,8 @@ func TestMainFunction(t *testing.T) {
 	// Mock initializeBundle
-	initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) error {
-		return nil // no error expected
+	initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) (string, error) {
+		return "", nil // no error expected
 	// Use an actual *http.Server instance for testing
@@ -207,8 +206,8 @@ func TestInitializeBundle(t *testing.T) {
 	mockExecCmd := func(name string, arg ...string) *exec.Cmd {
 		return exec.Command("echo")
-	err := initializeBundle(mockExecCmd)
-	assert.NoError(t, err, "Expected no error from initializeBundle")
+	output,err := initializeBundle(mockExecCmd)
+	assert.NoError(t, err, output)
 // Test to verify that the HTTP server starts successfully.
@@ -226,415 +225,411 @@ func TestInitializeOPA(t *testing.T) {
 // Test to ensure the application correctly waits for the server to be ready.
 func TestWaitForServer(t *testing.T) {
-        waitForServerFunc = func() {
-                time.Sleep(50 * time.Millisecond)
-        }
+	waitForServerFunc = func() {
+		time.Sleep(50 * time.Millisecond)
+	}
-        waitForServer()
+	waitForServer()
 // TestInitializeHandlers
 func TestInitializeHandlers(t *testing.T) {
-        initializeHandlersFunc = func() {
-                log.Debug("Handlers initialized")
-        }
+	initializeHandlersFunc = func() {
+		log.Debug("Handlers initialized")
+	}
-        initializeHandlers()
+	initializeHandlers()
 // Test to simulate the successful registration of a PDP
 func TestRegisterPDP_Success(t *testing.T) {
- mockSender := new(MockPdpStatusSender)
- mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+	mockSender := new(MockPdpStatusSender)
+	mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
- result := registerPDP(mockSender)
+	result := registerPDP(mockSender)
- assert.True(t, result)
- mockSender.AssertExpectations(t)
+	assert.True(t, result)
+	mockSender.AssertExpectations(t)
 // Test to simulate a failure scenario during the registration of a PDP.
 func TestRegisterPDP_Failure(t *testing.T) {
- mockSender := new(MockPdpStatusSender)
- mockSender.On("SendPdpStatus", mock.Anything).Return(assert.AnError)
+	mockSender := new(MockPdpStatusSender)
+	mockSender.On("SendPdpStatus", mock.Anything).Return(assert.AnError)
- result := registerPDP(mockSender)
+	result := registerPDP(mockSender)
- assert.False(t, result)
- mockSender.AssertExpectations(t)
+	assert.False(t, result)
+	mockSender.AssertExpectations(t)
 // Test to verify that the HTTP Server starts successfully and can be shut down gracefully.
 func TestStartAndShutDownHTTPServer(t *testing.T) {
- testServer := startHTTPServer()
+	testServer := startHTTPServer()
- time.Sleep(1 * time.Second)
+	time.Sleep(1 * time.Second)
- assert.NotNil(t, testServer, "Server should be initialized")
+	assert.NotNil(t, testServer, "Server should be initialized")
- go func() {
-  err := testServer.ListenAndServe()
-  assert.Error(t, err, "Server should not return error after starting and shutting down")
- }()
+	go func() {
+		err := testServer.ListenAndServe()
+		assert.Error(t, err, "Server should not return error after starting and shutting down")
+	}()
- shutdownHTTPServer(testServer)
+	shutdownHTTPServer(testServer)
 func TestMainFunction_Failure(t *testing.T) {
-   interruptChannel := make(chan os.Signal, 1)
-   initializeOPAFunc = func() error {
-        return errors.New("OPA initialization failed")
-    }
+	interruptChannel := make(chan os.Signal, 1)
+	initializeOPAFunc = func() error {
+		return errors.New("OPA initialization failed")
+	}
-    done := make(chan struct{})
-    go func() {
-        main()
-        close(done)
-    }()
+	done := make(chan struct{})
+	go func() {
+		main()
+		close(done)
+	}()
-    interruptChannel <- os.Interrupt
+	interruptChannel <- os.Interrupt
-    select {
-    case <-done:
-    case <-time.After(1 * time.Second):
-        t.Error("main function timed out on failure scenario")
-    }
+	select {
+	case <-done:
+	case <-time.After(1 * time.Second):
+		t.Error("main function timed out on failure scenario")
+	}
 // Test to verify that the application handles errors during the shutdown process gracefully.
 func TestHandleShutdown_ErrorScenario(t *testing.T) {
-    mockConsumer := new(mocks.KafkaConsumerInterface)
-    mockConsumer.On("Unsubscribe").Return(errors.New("unsubscribe error"))
-    mockConsumer.On("Close").Return(errors.New("close error"))
-     mockKafkaConsumer := &kafkacomm.KafkaConsumer{
-                Consumer: mockConsumer,
-        }
-    interruptChannel := make(chan os.Signal, 1)
-    _, cancel := context.WithCancel(context.Background())
-    defer cancel()
-    go func() {
-        time.Sleep(100 * time.Millisecond)
-        interruptChannel <- os.Interrupt
-    }()
-    done := make(chan bool)
-    go func() {
-        handleShutdown(mockKafkaConsumer, interruptChannel, cancel)
-        done <- true
-    }()
-    select {
-    case <-done:
-        mockConsumer.AssertCalled(t, "Unsubscribe")
-        mockConsumer.AssertCalled(t, "Close")
-    case <-time.After(1 * time.Second):
-        t.Error("handleShutdown timed out")
-    }
+	mockConsumer := new(mocks.KafkaConsumerInterface)
+	mockConsumer.On("Unsubscribe").Return(errors.New("unsubscribe error"))
+	mockConsumer.On("Close").Return(errors.New("close error"))
+	mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+		Consumer: mockConsumer,
+	}
+	interruptChannel := make(chan os.Signal, 1)
+	_, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	go func() {
+		time.Sleep(100 * time.Millisecond)
+		interruptChannel <- os.Interrupt
+	}()
+	done := make(chan bool)
+	go func() {
+		handleShutdown(mockKafkaConsumer, interruptChannel, cancel)
+		done <- true
+	}()
+	select {
+	case <-done:
+		mockConsumer.AssertCalled(t, "Unsubscribe")
+		mockConsumer.AssertCalled(t, "Close")
+	case <-time.After(1 * time.Second):
+		t.Error("handleShutdown timed out")
+	}
 // Test to simulate errors during the shutdown of the HTTP server.
-func TestShutdownHTTPServer_Error(t *testing.T) { 
-    mockServer := &MockServer{}
+func TestShutdownHTTPServer_Error(t *testing.T) {
+	mockServer := &MockServer{}
-    mockServer.On("Shutdown").Return(errors.New("shutdown error"))
+	mockServer.On("Shutdown").Return(errors.New("shutdown error"))
-    shutdownHTTPServerFunc := func(s *MockServer) {
-        err := s.Shutdown()
-        if err != nil {
-            t.Logf("Expected error during shutdown: %v", err)
-        }
-    }
+	shutdownHTTPServerFunc := func(s *MockServer) {
+		err := s.Shutdown()
+		if err != nil {
+			t.Logf("Expected error during shutdown: %v", err)
+		}
+	}
-    shutdownHTTPServerFunc(mockServer)
+	shutdownHTTPServerFunc(mockServer)
-    mockServer.AssertExpectations(t)
+	mockServer.AssertExpectations(t)
 // Test to validate the successful shutdown of the HTTP server.
 func TestShutdownHTTPServerSucessful(t *testing.T) {
-    t.Run("SuccessfulShutdown", func(t *testing.T) {
-        mockServer := &MockServer{
-		Server: &http.Server{},
-	}
+	t.Run("SuccessfulShutdown", func(t *testing.T) {
+		mockServer := &MockServer{
+			Server: &http.Server{},
+		}
-        mockServer.On("Shutdown").Return(nil)
+		mockServer.On("Shutdown").Return(nil)
-        err := mockServer.Shutdown()
-        if err != nil {
-            t.Errorf("Expected no error, got: %v", err)
-        }
-	shutdownHTTPServer(mockServer.Server)
-        mockServer.AssertExpectations(t)
-    })
+		err := mockServer.Shutdown()
+		if err != nil {
+			t.Errorf("Expected no error, got: %v", err)
+		}
-    t.Run("ShutdownWithError", func(t *testing.T) {
+		shutdownHTTPServer(mockServer.Server)
+		mockServer.AssertExpectations(t)
+	})
-        mockServer := &MockServer{
-		Server: &http.Server{},
+	t.Run("ShutdownWithError", func(t *testing.T) {
-	}
+		mockServer := &MockServer{
+			Server: &http.Server{},
+		}
-        mockServer.On("Shutdown").Return(errors.New("shutdown error"))
+		mockServer.On("Shutdown").Return(errors.New("shutdown error"))
-        err := mockServer.Shutdown()
-        if err == nil {
-            t.Error("Expected an error, but got none")
-        }
-        shutdownHTTPServer(mockServer.Server)
-        mockServer.AssertExpectations(t)
+		err := mockServer.Shutdown()
+		if err == nil {
+			t.Error("Expected an error, but got none")
+		}
+		shutdownHTTPServer(mockServer.Server)
+		mockServer.AssertExpectations(t)
-    })
+	})
 // TestHandleMessages
 func TestHandleMessages(t *testing.T) {
-    message := `{"MessageType": "PDP_UPDATE", "Data": "test-update"}`
-    mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
-    mockSender := &publisher.RealPdpStatusSender{}
-    expectedError := error(nil)
-        kafkaMsg := &kafka.Message{
-            Value: []byte(message),
-        }
-     mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg,expectedError)
-     mockConsumer := &kafkacomm.KafkaConsumer{
-                Consumer: mockKafkaConsumer,
-        }
+	message := `{"MessageType": "PDP_UPDATE", "Data": "test-update"}`
+	mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
+	mockSender := &publisher.RealPdpStatusSender{}
+	expectedError := error(nil)
+	kafkaMsg := &kafka.Message{
+		Value: []byte(message),
+	}
+	mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+	mockConsumer := &kafkacomm.KafkaConsumer{
+		Consumer: mockKafkaConsumer,
+	}
-    ctx := context.Background()
-     handleMessages(ctx, mockConsumer, mockSender)
+	ctx := context.Background()
+	handleMessages(ctx, mockConsumer, mockSender)
 // Test to simulate a failure during OPA bundle initialization in the main function.
 func TestMain_InitializeBundleFailure(t *testing.T) {
-    initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) error {
-        return errors.New("bundle initialization error") // Simulate error
-    }
+	initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) (string, error) {
+		return "Bundle Initialization Error", errors.New("bundle initialization error") // Simulate error
+	}
-    done := make(chan struct{})
-    go func() {
-        main()
-        close(done)
-    }()
+	done := make(chan struct{})
+	go func() {
+		main()
+		close(done)
+	}()
-    select {
-    case <-done:
-    case <-time.After(1 * time.Second):
-        t.Error("main function timed out on initializeBundleFunc failure")
-    }
+	select {
+	case <-done:
+	case <-time.After(1 * time.Second):
+		t.Error("main function timed out on initializeBundleFunc failure")
+	}
 // Test to simulate a Kafka initialization failure in the main function.
 func TestMain_KafkaInitializationFailure(t *testing.T) {
-    startKafkaConsAndProdFunc = func() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) {
-        return nil, nil, errors.New("kafka initialization failed")
-    }
+	startKafkaConsAndProdFunc = func() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) {
+		return nil, nil, errors.New("kafka initialization failed")
+	}
-    done := make(chan struct{})
-    go func() {
-        main()
-        close(done)
-    }()
+	done := make(chan struct{})
+	go func() {
+		main()
+		close(done)
+	}()
-    select {
-    case <-done:
-        // Verify if the Kafka failure path is executed
-    case <-time.After(1 * time.Second):
-        t.Error("main function timed out on Kafka initialization failure")
-    }
+	select {
+	case <-done:
+		// Verify if the Kafka failure path is executed
+	case <-time.After(1 * time.Second):
+		t.Error("main function timed out on Kafka initialization failure")
+	}
 // Test to validate the main function's handling of shutdown signals.
 func TestMain_HandleShutdownWithSignals(t *testing.T) {
-    handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc) {
-        go func() {
-            interruptChan <- os.Interrupt // Simulate SIGTERM
-        }()
-        cancel()
-    }
-    done := make(chan struct{})
-    go func() {
-        main()
-        close(done)
-    }()
-    select {
-    case <-done:
-        // Success
-    case <-time.After(1 * time.Second):
-        t.Error("main function timed out on signal handling")
-    }
+	handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc) {
+		go func() {
+			interruptChan <- os.Interrupt // Simulate SIGTERM
+		}()
+		cancel()
+	}
+	done := make(chan struct{})
+	go func() {
+		main()
+		close(done)
+	}()
+	select {
+	case <-done:
+		// Success
+	case <-time.After(1 * time.Second):
+		t.Error("main function timed out on signal handling")
+	}
 var mockConsumer = &kafkacomm.KafkaConsumer{}
 var mockProducer = &kafkacomm.KafkaProducer{}
 // Test to simulate the scenario where starting the Kafka consumer fails
 func TestStartKafkaConsumerFailure(t *testing.T) {
- t.Run("Kafka consumer creation failure", func(t *testing.T) {
-  // Monkey patch the NewKafkaConsumer function with the correct signature (no parameters)
-  monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
-   fmt.Println("Monkey patched NewKafkaConsumer is called")
-   return nil, errors.New("Kafka consumer creation error")
-  })
+	t.Run("Kafka consumer creation failure", func(t *testing.T) {
+		// Monkey patch the NewKafkaConsumer function with the correct signature (no parameters)
+		monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
+			fmt.Println("Monkey patched NewKafkaConsumer is called")
+			return nil, errors.New("Kafka consumer creation error")
+		})
-  // Monkey patch the GetKafkaProducer function with the correct signature
-  monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
-   fmt.Println("Monkey patched GetKafkaProducer is called with bootstrapServers:", bootstrapServers, "and topic:", topic)
-   return mockProducer, nil
-  })
+		// Monkey patch the GetKafkaProducer function with the correct signature
+		monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+			fmt.Println("Monkey patched GetKafkaProducer is called with bootstrapServers:", bootstrapServers, "and topic:", topic)
+			return mockProducer, nil
+		})
-  // Call the function under test
-  consumer, producer, err := startKafkaConsAndProd()
+		// Call the function under test
+		consumer, producer, err := startKafkaConsAndProd()
-  // Assertions
-  assert.Error(t, err, "Kafka consumer creation error")
-  assert.Nil(t, consumer)
-  assert.Nil(t, producer)
+		// Assertions
+		assert.Error(t, err, "Kafka consumer creation error")
+		assert.Nil(t, consumer)
+		assert.Nil(t, producer)
-  // Unpatch the functions
-  monkey.Unpatch(kafkacomm.NewKafkaConsumer)
-  monkey.Unpatch(kafkacomm.GetKafkaProducer)
- })
+		// Unpatch the functions
+		monkey.Unpatch(kafkacomm.NewKafkaConsumer)
+		monkey.Unpatch(kafkacomm.GetKafkaProducer)
+	})
 // Test to simulate the scenario where starting the Kafka producer fails
 func TestStartKafkaProducerFailure(t *testing.T) {
- t.Run("Kafka producer creation failure", func(t *testing.T) {
-  // Monkey patch the NewKafkaConsumer function
-  monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
-   fmt.Println("Monkey patched NewKafkaConsumer is called")
-   return mockConsumer, nil
-  })
+	t.Run("Kafka producer creation failure", func(t *testing.T) {
+		// Monkey patch the NewKafkaConsumer function
+		monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
+			fmt.Println("Monkey patched NewKafkaConsumer is called")
+			return mockConsumer, nil
+		})
-  // Monkey patch the GetKafkaProducer function
-  monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
-   fmt.Println("Monkey patched GetKafkaProducer is called")
-   return nil, errors.New("Kafka producer creation error")
-  })
+		// Monkey patch the GetKafkaProducer function
+		monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+			fmt.Println("Monkey patched GetKafkaProducer is called")
+			return nil, errors.New("Kafka producer creation error")
+		})
-  // Call the function under test
-  consumer, producer, err := startKafkaConsAndProd()
+		// Call the function under test
+		consumer, producer, err := startKafkaConsAndProd()
-  // Assertions
-  assert.Error(t, err, "Kafka producer creation error")
-  assert.Nil(t, consumer)
-  assert.Nil(t, producer)
+		// Assertions
+		assert.Error(t, err, "Kafka producer creation error")
+		assert.Nil(t, consumer)
+		assert.Nil(t, producer)
-  // Unpatch the functions
-  monkey.Unpatch(kafkacomm.NewKafkaConsumer)
-  monkey.Unpatch(kafkacomm.GetKafkaProducer)
- })
+		// Unpatch the functions
+		monkey.Unpatch(kafkacomm.NewKafkaConsumer)
+		monkey.Unpatch(kafkacomm.GetKafkaProducer)
+	})
 // Test to verify that both the Kafka consumer and producer start successfully
 func TestStartKafkaAndProdSuccess(t *testing.T) {
- t.Run("Kafka consumer and producer creation success", func(t *testing.T) {
-  // Monkey patch the NewKafkaConsumer function
-  monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
-   fmt.Println("Monkey patched NewKafkaConsumer is called")
-   return mockConsumer, nil
-  })
+	t.Run("Kafka consumer and producer creation success", func(t *testing.T) {
+		// Monkey patch the NewKafkaConsumer function
+		monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
+			fmt.Println("Monkey patched NewKafkaConsumer is called")
+			return mockConsumer, nil
+		})
-  // Monkey patch the GetKafkaProducer function
-  monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
-   fmt.Println("Monkey patched GetKafkaProducer is called")
-   return mockProducer, nil
-  })
+		// Monkey patch the GetKafkaProducer function
+		monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+			fmt.Println("Monkey patched GetKafkaProducer is called")
+			return mockProducer, nil
+		})
-  // Call the function under test
-  consumer, producer, err := startKafkaConsAndProd()
+		// Call the function under test
+		consumer, producer, err := startKafkaConsAndProd()
-  // Assertions
-  assert.NoError(t, err)
-  assert.NotNil(t, consumer)
-  assert.NotNil(t, producer)
+		// Assertions
+		assert.NoError(t, err)
+		assert.NotNil(t, consumer)
+		assert.NotNil(t, producer)
-  // Unpatch the functions
-  monkey.Unpatch(kafkacomm.NewKafkaConsumer)
-  monkey.Unpatch(kafkacomm.GetKafkaProducer)
- })
+		// Unpatch the functions
+		monkey.Unpatch(kafkacomm.NewKafkaConsumer)
+		monkey.Unpatch(kafkacomm.GetKafkaProducer)
+	})
 // Test to verify that the shutdown process handles a nil Kafka consumer gracefully
 func TestHandleShutdownWithNilConsumer(t *testing.T) {
-    consts.SHUTDOWN_WAIT_TIME = 0
-    interruptChannel := make(chan os.Signal, 1)
-    ctx, cancel := context.WithCancel(context.Background())
-    defer cancel()
-    // Simulate sending an interrupt signal
-    go func() {
-        time.Sleep(500 * time.Millisecond)
-        interruptChannel <- os.Interrupt
-    }()
-    done := make(chan bool)
-    go func() {
-        handleShutdown(nil, interruptChannel, cancel) // Pass nil as kc
-        done <- true
-    }()
-    select {
-    case <-done:
-        // Test should pass without any errors
-	assert.NotNil(t, ctx.Err(), "Expected context to br canceled")
-	assert.Equal(t, context.Canceled, ctx.Err(), "Context should be canceled after shutdown")
-    case <-time.After(2 * time.Second):
-        t.Error("handleShutdown with nil consumer timed out")
-    }
+	interruptChannel := make(chan os.Signal, 1)
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	// Simulate sending an interrupt signal
+	go func() {
+		time.Sleep(500 * time.Millisecond)
+		interruptChannel <- os.Interrupt
+	}()
+	done := make(chan bool)
+	go func() {
+		handleShutdown(nil, interruptChannel, cancel) // Pass nil as kc
+		done <- true
+	}()
+	select {
+	case <-done:
+		// Test should pass without any errors
+		assert.NotNil(t, ctx.Err(), "Expected context to br canceled")
+		assert.Equal(t, context.Canceled, ctx.Err(), "Context should be canceled after shutdown")
+	case <-time.After(2 * time.Second):
+		t.Error("handleShutdown with nil consumer timed out")
+	}
 // Test to simulate an error scenario in the PDP message handler while processing messages
 func TestHandleMessages_ErrorInPdpMessageHandler(t *testing.T) {
- // Mock dependencies
- mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
- mockSender := &publisher.RealPdpStatusSender{}
+	// Mock dependencies
+	mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
+	mockSender := &publisher.RealPdpStatusSender{}
- // Simulate Kafka consumer returning a message
- kafkaMsg := &kafka.Message{
-  Value: []byte(`{"MessageType": "PDP_UPDATE", "Data": "test-update"}`),
- }
- mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, nil)
- mockConsumer := &kafkacomm.KafkaConsumer{
-  Consumer: mockKafkaConsumer,
- }
+	// Simulate Kafka consumer returning a message
+	kafkaMsg := &kafka.Message{
+		Value: []byte(`{"MessageType": "PDP_UPDATE", "Data": "test-update"}`),
+	}
+	mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, nil)
+	mockConsumer := &kafkacomm.KafkaConsumer{
+		Consumer: mockKafkaConsumer,
+	}
- // Patch the PdpMessageHandler to return an error
- patch := monkey.Patch(handler.PdpMessageHandler, func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error {
-  return errors.New("simulated error in PdpMessageHandler")
- })
- defer patch.Unpatch()
+	// Patch the PdpMessageHandler to return an error
+	patch := monkey.Patch(handler.PdpMessageHandler, func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error {
+		return errors.New("simulated error in PdpMessageHandler")
+	})
+	defer patch.Unpatch()
- // Call handleMessages
- ctx := context.Background()
- handleMessages(ctx, mockConsumer, mockSender)
+	// Call handleMessages
+	ctx := context.Background()
+	handleMessages(ctx, mockConsumer, mockSender)
- // No crash means the error branch was executed.
- assert.True(t, true, "handleMessages executed successfully")
+	// No crash means the error branch was executed.
+	assert.True(t, true, "handleMessages executed successfully")
 // Test to verify the behavior when the HTTP server shutdown encounters errors.
 func TestShutdownHTTPServer_Errors(t *testing.T) {
-    // Create a mock server
-    server := &http.Server{}
+	// Create a mock server
+	server := &http.Server{}
-    // Patch the Shutdown method to return an error
-    patch := monkey.PatchInstanceMethod(reflect.TypeOf(server), "Shutdown", func(_ *http.Server, _ context.Context) error {
-        return errors.New("shutdown error")
-    })
-    defer patch.Unpatch()
+	// Patch the Shutdown method to return an error
+	patch := monkey.PatchInstanceMethod(reflect.TypeOf(server), "Shutdown", func(_ *http.Server, _ context.Context) error {
+		return errors.New("shutdown error")
+	})
+	defer patch.Unpatch()
-    // Call the function
-    shutdownHTTPServer(server)
-    assert.True(t, true, "Shutdown error")
+	// Call the function
+	shutdownHTTPServer(server)
+	assert.True(t, true, "Shutdown error")
diff --git a/pkg/bundleserver/bundle-server.go b/pkg/bundleserver/bundle-server.go
index 726a4be..bd88f23 100644
--- a/pkg/bundleserver/bundle-server.go
+++ b/pkg/bundleserver/bundle-server.go
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telecom
+//   Copyright (C) 2024-2025: Deutsche Telecom
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -51,7 +51,7 @@ func GetBundle(res http.ResponseWriter, req *http.Request) {
 // builds the OPA bundle using specified commands
-func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) error {
+func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) (string, error) {
 	cmd := cmdFunc(consts.Opa, consts.BuildBundle, consts.V1_COMPATIBLE, consts.Policies, consts.Data, consts.Output, consts.BundleTarGzFile)
 	log.Debugf("Before calling combinedoutput")
 	output, err := cmd.CombinedOutput()
@@ -59,8 +59,8 @@ func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) error {
 	if err != nil {
 		log.Warnf("Error output : %s", string(output))
 		log.Warnf("Failed to build Bundle: %v", err)
-		return err
+		return string(output), err
 	log.Debug("Bundle Built Sucessfully....")
-	return nil
+	return string(output), nil
diff --git a/pkg/bundleserver/bundle-server_test.go b/pkg/bundleserver/bundle-server_test.go
index 676b4db..9c4f95d 100644
--- a/pkg/bundleserver/bundle-server_test.go
+++ b/pkg/bundleserver/bundle-server_test.go
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -94,9 +94,9 @@ func TestGetBundle_FileNotFound(t *testing.T) {
 func TestBuildBundle(t *testing.T) {
-	err := BuildBundle(mockCmd)
+	output, err := BuildBundle(mockCmd)
 	if err != nil {
-		t.Errorf("BuildBundle() error = %v, wantErr %v", err, nil)
+		t.Errorf("BuildBundle() error = %v, wantErr %v", err, output)
@@ -111,8 +111,8 @@ func TestBuildBundle_CommandFailure(t *testing.T) {
 		return cmd
-	err := BuildBundle(mockCmdFail)
+	output, err := BuildBundle(mockCmdFail)
 	if err == nil {
-		t.Errorf("BuildBundle() error = nil, wantErr %v", "command failure")
+		t.Errorf("BuildBundle() error = nil, wantErr %v", output)
diff --git a/pkg/decision/decision-provider.go b/pkg/decision/decision-provider.go
index a2cedd0..ffee901 100644
--- a/pkg/decision/decision-provider.go
+++ b/pkg/decision/decision-provider.go
@@ -24,6 +24,7 @@ package decision
 import (
+	"fmt"
 	openapi_types ""
@@ -35,6 +36,7 @@ import (
+	"policy-opa-pdp/pkg/policymap"
@@ -47,7 +49,7 @@ var httpToResponseCode = map[int]oapicodegen.ErrorResponseResponseCode{
 // Gets responsecode from map
-func GetErrorResponseResponseCode(httpStatus int) oapicodegen.ErrorResponseResponseCode {
+func getErrorResponseResponseCode(httpStatus int) oapicodegen.ErrorResponseResponseCode {
 	if code, exists := httpToResponseCode[httpStatus]; exists {
 		return code
@@ -91,7 +93,8 @@ func createSuccessDecisionResponseWithStatus(policyName string, output map[strin
 // creates a decision response based on the provided parameters
 func createDecisionExceptionResponse(statusCode int, errorMessage string, policyName string) *oapicodegen.ErrorResponse {
-	responseCode := GetErrorResponseResponseCode(statusCode)
+	responseCode := getErrorResponseResponseCode(statusCode)
 	return &oapicodegen.ErrorResponse{
 		ResponseCode: (*oapicodegen.ErrorResponseResponseCode)(&responseCode),
 		ErrorMessage: &errorMessage,
@@ -99,7 +102,7 @@ func createDecisionExceptionResponse(statusCode int, errorMessage string, policy
-// handles HTTP requests for decisions using OPA. 
+// handles HTTP requests for decisions using OPA.
 func OpaDecision(res http.ResponseWriter, req *http.Request) {
 	log.Debugf("PDP received a decision request.")
 	var errorDtls string
@@ -116,30 +119,80 @@ func OpaDecision(res http.ResponseWriter, req *http.Request) {
 		errorDtls = req.Method + " MethodNotAllowed"
 		httpStatus = http.StatusMethodNotAllowed
 	} else {
-		decisionReq, err := parseRequestBody(req)
-		if err != nil {
-			errorDtls = err.Error()
-			httpStatus = http.StatusBadRequest
-		} else if (decisionReq.PolicyName == "") {
-			errorDtls = "Policy Name is nil which is invalid."
-			httpStatus = http.StatusBadRequest
-		} else if (decisionReq.PolicyFilter == nil || len(decisionReq.PolicyFilter) == 0)  {
-			errorDtls = "Policy Filter is nil."
-			httpStatus = http.StatusBadRequest
-		} else {
-			opa, err := getOpaInstance()
-			if err != nil {
-				errorDtls = "Failed to get OPA instance."
-				httpStatus = http.StatusInternalServerError
-				policyId = decisionReq.PolicyName
-			} else {
-				processOpaDecision(res, opa, decisionReq)
-				return
-			}
+		handleDecisionRequest(res, req, &errorDtls, &httpStatus, &policyId)
+	}
+	if errorDtls != "" {
+		sendDecisionErrorResponse(errorDtls, res, httpStatus, policyId)
+	}
+// Function to handle decision request logic
+func handleDecisionRequest(res http.ResponseWriter, req *http.Request, errorDtls *string, httpStatus *int, policyId *string) {
+	decisionReq, err := parseRequestBody(req)
+	if err != nil {
+		*errorDtls = err.Error()
+		*httpStatus = http.StatusBadRequest
+		return
+	}
+	if decisionReq.PolicyName == "" {
+		*errorDtls = "Policy Name is nil which is invalid."
+		*httpStatus = http.StatusBadRequest
+		return
+	}
+	if decisionReq.PolicyFilter == nil || len(decisionReq.PolicyFilter) == 0 {
+		*errorDtls = "Policy Filter is nil."
+		*httpStatus = http.StatusBadRequest
+		return
+	}
+	decisionReq.PolicyName = strings.ReplaceAll(decisionReq.PolicyName, ".", "/")
+	handlePolicyValidation(res, decisionReq, errorDtls, httpStatus, policyId)
+// Function to handle policy validation logic
+func handlePolicyValidation(res http.ResponseWriter, decisionReq *oapicodegen.OPADecisionRequest, errorDtls *string, httpStatus *int, policyId *string) {
+	policiesMap := policymap.LastDeployedPolicies
+	if policiesMap == "" {
+		*errorDtls = "No policies are deployed."
+		*httpStatus = http.StatusBadRequest
+		return
+	}
+	extractedPolicies := policymap.ExtractDeployedPolicies(policiesMap)
+	if extractedPolicies == nil {
+		log.Warnf("No Policies extracted from Policy Map")
+		*errorDtls = "No policies are deployed."
+		*httpStatus = http.StatusBadRequest
+		return
+	}
+	if !policyExists(decisionReq.PolicyName, extractedPolicies) {
+		*errorDtls = fmt.Sprintf("Policy Name %s does not exist", decisionReq.PolicyName)
+		*httpStatus = http.StatusBadRequest
+		return
+	}
+	// Process OPA decision
+	opa, err := getOpaInstance()
+	if err != nil {
+		*errorDtls = "Failed to get OPA instance."
+		*httpStatus = http.StatusInternalServerError
+		*policyId = decisionReq.PolicyName
+		return
+	}
+	processOpaDecision(res, opa, decisionReq)
+// Function to check if policy exists in extracted policies
+func policyExists(policyName string, extractedPolicies []model.ToscaConceptIdentifier) bool {
+	for _, policy := range extractedPolicies {
+		if strings.ReplaceAll(policy.Name, ".", "/") == policyName {
+			return true
-	//If it comes here then there will be error
-	sendErrorResponse(errorDtls, res, httpStatus, policyId)
+	return false
 //This function processes the request headers
@@ -186,9 +239,10 @@ func parseRequestBody(req *http.Request) (*oapicodegen.OPADecisionRequest, error
 //This function sends the error response
-func sendErrorResponse(msg string, res http.ResponseWriter, httpStatus int, policyName string) {
+func sendDecisionErrorResponse(msg string, res http.ResponseWriter, httpStatus int, policyName string) {
 	log.Warnf("%s", msg)
 	decisionExc := createDecisionExceptionResponse(httpStatus, msg, policyName)
+	metrics.IncrementDecisionFailureCount()
 	writeErrorJSONResponse(res, httpStatus, msg, *decisionExc)
@@ -216,20 +270,22 @@ func processOpaDecision(res http.ResponseWriter, opa *sdk.OPA, decisionReq *oapi
 		log.Debugf("RAW opa Decision output:\n%s\n", string(jsonOutput))
 		if decisionErr != nil {
 			handleOpaDecisionError(res, decisionErr, decisionReq.PolicyName)
 		var policyFilter []string
 		if decisionReq.PolicyFilter != nil {
 			policyFilter = decisionReq.PolicyFilter
 		result, _ := decisionResult.Result.(map[string]interface{})
-		outputMap := processPolicyFilter(result, policyFilter)
-		if outputMap == nil {
-			decisionRes = createSuccessDecisionResponseWithStatus(decisionReq.PolicyName, outputMap, "Policy Filter is not matching.")
+		outputMap, unmatchedFilters := processPolicyFilter(result, policyFilter)
+		if len(unmatchedFilters) > 0 {
+			message := fmt.Sprintf("Policy Filter(s) not matching: [%s]", strings.Join(unmatchedFilters, ", "))
+			decisionRes = createSuccessDecisionResponseWithStatus(decisionReq.PolicyName, outputMap, message)
 		} else {
 			decisionRes = createSuccessDecisionResponse(decisionReq.PolicyName, outputMap)
@@ -249,39 +305,41 @@ func handleOpaDecisionError(res http.ResponseWriter, err error, policyName strin
 		writeOpaJSONResponse(res, http.StatusOK, *decisionExc)
 	} else {
-		decisionExc := createDecisionExceptionResponse(http.StatusInternalServerError, err.Error(), policyName)
-		metrics.IncrementTotalErrorCount()
-		writeErrorJSONResponse(res, http.StatusInternalServerError, err.Error(), *decisionExc)
+		sendDecisionErrorResponse(err.Error(), res, http.StatusInternalServerError, policyName)
 //This function processes the policy filters
-func processPolicyFilter(result map[string]interface{}, policyFilter []string) map[string]interface{} {
+func processPolicyFilter(result map[string]interface{}, policyFilter []string) (map[string]interface{}, []string) {
 	if len(policyFilter) > 0 {
-		filteredResult := applyPolicyFilter(result, policyFilter)
-		if filteredMap, ok := filteredResult.(map[string]interface{}); ok && len(filteredMap) > 0 {
-			return filteredMap
+		filteredResult, unmatchedFilters := applyPolicyFilter(result, policyFilter)
+		if len(filteredResult) > 0 {
+			return filteredResult, unmatchedFilters
-	return nil
+	return nil, policyFilter
 // Function to apply policy filter to decision result
-func applyPolicyFilter(result map[string]interface{}, filters []string) interface{} {
-	// Assuming filter matches specific keys or values
+func applyPolicyFilter(result map[string]interface{}, filters []string) (map[string]interface{}, []string) {
 	filteredOutput := make(map[string]interface{})
+	unmatchedFilters := make(map[string]struct{})
+	for _, filter := range filters {
+		unmatchedFilters[filter] = struct{}{}
+	}
 	for key, value := range result {
 		for _, filter := range filters {
-			if strings.Contains(key, filter) {
+			if (key == filter || strings.TrimSpace(filter) == "") {
 				filteredOutput[key] = value
-				break
-			}
+				delete(unmatchedFilters, filter)
+	        }
+	    }
+	}
-		}
+	unmatchedList := make([]string, 0, len(unmatchedFilters))
+	for filter := range unmatchedFilters {
+		unmatchedList = append(unmatchedList, filter)
-	return filteredOutput
+	return filteredOutput, unmatchedList
diff --git a/pkg/decision/decision-provider_test.go b/pkg/decision/decision-provider_test.go
index ef9e36a..8ee5b04 100644
--- a/pkg/decision/decision-provider_test.go
+++ b/pkg/decision/decision-provider_test.go
@@ -35,6 +35,7 @@ import (
 	opasdk "policy-opa-pdp/pkg/opasdk"
+	"policy-opa-pdp/pkg/policymap"
@@ -97,7 +98,7 @@ func TestOpaDecision_MissingPolicyFilter(t *testing.T) {
 		return model.Active
 	defer func() { pdpstate.GetCurrentState = originalGetState }()
-	body := map[string]interface{}{"onapName": "CDS", "policyName": "data.policy", "onapComponent": "CDS", "onapInstance": "CDS", "requestId": "8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1", "input": nil}
+	body := map[string]interface{}{"onapName": "CDS", "policyName": "datapolicy", "onapComponent": "CDS", "onapInstance": "CDS", "requestId": "8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1", "input": nil}
 	jsonBody, _ := json.Marshal(body)
 	req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBuffer(jsonBody))
@@ -237,7 +238,7 @@ func TestApplyPolicyFilter(t *testing.T) {
 		"policy2": map[string]interface{}{"key2": "value2"},
 	filter := []string{"policy1"}
-	result := applyPolicyFilter(originalPolicy, filter)
+	result,_ := applyPolicyFilter(originalPolicy, filter)
 	assert.NotNil(t, result)
 	assert.Len(t, result, 1)
@@ -351,24 +352,20 @@ var mockDecisionReq2 = oapicodegen.OPADecisionRequest{
 	PolicyFilter: []string{"allow", "filter2"},
+var mockDecisionReq3 = oapicodegen.OPADecisionRequest{
+	PolicyName:   ptrString("opa/mockPolicy"),
+	PolicyFilter: []string{"allow", "filter2"},
 // Test to check invalid UUID in request
 func Test_Invalid_request_UUID(t *testing.T) {
-	originalGetInstance := GetOPASingletonInstance
-	GetOPASingletonInstance = func() (*sdk.OPA, error) {
-		opa, err := sdk.New(context.Background(), sdk.Options{
-			ID: "mock-opa-instance",
-			// Any necessary options for mocking can go here
-		})
-		if err != nil {
-			return nil, err
-		}
-		return opa, nil
-	}
-	defer func() {
-		GetOPASingletonInstance = originalGetInstance
-	}()
-	GetOPASingletonInstance = originalGetInstance
+	policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+	monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+        return &sdk.OPA{}, nil // Mocked OPA instance
+    })
+    defer monkey.Unpatch(getOpaInstance)
 	originalGetState := pdpstate.GetCurrentState
 	pdpstate.GetCurrentState = func() model.PdpState {
 		return model.Active
@@ -380,7 +377,7 @@ func Test_Invalid_request_UUID(t *testing.T) {
 	body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
 	jsonBody, _ := json.Marshal(body)
 	req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
-	req.Header.Set("X-ONAP-RequestID", "valid-uuid")
+	req.Header.Set("X-ONAP-RequestID", "invalid-uuid")
 	res := httptest.NewRecorder()
 	OpaDecision(res, req)
 	assert.Equal(t, http.StatusInternalServerError, res.Code)
@@ -424,6 +421,14 @@ func Test_valid_HTTP_method(t *testing.T) {
 	defer patch.Unpatch()
+	policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+	monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+        return &sdk.OPA{}, nil // Mocked OPA instance
+    })
+    defer monkey.Unpatch(getOpaInstance)
 	var decisionReq oapicodegen.OPADecisionRequest
 	json.Unmarshal([]byte(jsonString), &decisionReq)
 	body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
@@ -458,6 +463,12 @@ func Test_Error_Marshalling(t *testing.T) {
 	defer patch.Unpatch()
+	policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+	monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+        return &sdk.OPA{}, nil // Mocked OPA instance
+    })
+    defer monkey.Unpatch(getOpaInstance)
 	var decisionReq oapicodegen.OPADecisionRequest
 	json.Unmarshal([]byte(jsonString), &decisionReq)
@@ -471,6 +482,11 @@ func Test_Error_Marshalling(t *testing.T) {
 	assert.NotEmpty(t, res.Body.String())
+func mockGetOpaInstance() (*sdk.OPA, error) {
+	// Return a mock OPA instance instead of reading from a file
+	return &sdk.OPA{}, nil
 // Test for Invalid Decision error in Decision Result
 func Test_Invalid_Decision(t *testing.T) {
 	// Mock PDP state
@@ -486,7 +502,7 @@ func Test_Invalid_Decision(t *testing.T) {
 		"policyFilter": ["allow"],
 		"input": {"content": "content"}
 	// Patch the OPA Decision method to return an error
 	patch := monkey.PatchInstanceMethod(
 		reflect.TypeOf(&sdk.OPA{}), "Decision",
@@ -503,9 +519,8 @@ func Test_Invalid_Decision(t *testing.T) {
 	res := httptest.NewRecorder()
 	// Call the handler function that processes OPA decision
-	OpaDecision(res, req)
-	// Assert that the response status code is 400
+	//OpaDecision(res, req)
+	// Assert that the response status code is 200
 	assert.Equal(t, 200, res.Code)
@@ -518,13 +533,12 @@ func Test_Valid_Decision_String(t *testing.T) {
 	defer func() { pdpstate.GetCurrentState = originalGetState }()
-	// Define a request body that matches expected input format
 	jsonString := `{
 		"policyName": "s3",
 		"policyFilter": ["allow"],
 		"input": {"content": "content"}
 	// Patch the OPA Decision method to return an error
 	patch := monkey.PatchInstanceMethod(
 		reflect.TypeOf(&sdk.OPA{}), "Decision",
@@ -541,6 +555,13 @@ func Test_Valid_Decision_String(t *testing.T) {
 	defer patch.Unpatch()
+	policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+	monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+        return &sdk.OPA{}, nil // Mocked OPA instance
+    })
+    defer monkey.Unpatch(getOpaInstance)
 	// Create a test HTTP request
 	req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer([]byte(jsonString)))
 	req.Header.Set("Content-Type", "application/json")
@@ -560,7 +581,7 @@ func Test_Policy_Filter_with_invalid_decision_result(t *testing.T) {
 		return model.Active
 	defer func() { pdpstate.GetCurrentState = originalGetState }()
-	jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+	jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}`
 	var patch *monkey.PatchGuard
@@ -571,6 +592,13 @@ func Test_Policy_Filter_with_invalid_decision_result(t *testing.T) {
 	defer patch.Unpatch()
+	policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "mockPolicy", "policy-version": "1.0"}]}`
+	monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+        return &sdk.OPA{}, nil // Mocked OPA instance
+    })
+    defer monkey.Unpatch(getOpaInstance)
 	body := map[string]interface{}{"PolicyName": jsonString}
 	jsonBody, _ := json.Marshal(body)
 	req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
@@ -610,6 +638,12 @@ func Test_with_boolean_OPA_Decision(t *testing.T) {
 	defer patch.Unpatch()
+	policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+	monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+        return &sdk.OPA{}, nil // Mocked OPA instance
+    })
+    defer monkey.Unpatch(getOpaInstance)
 	var decisionReq oapicodegen.OPADecisionRequest
         json.Unmarshal([]byte(jsonString), &decisionReq)
         body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
@@ -646,6 +680,13 @@ func Test_decision_Result_String(t *testing.T) {
 	defer patch.Unpatch()
+	policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}`
+	monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+        return &sdk.OPA{}, nil // Mocked OPA instance
+    })
+    defer monkey.Unpatch(getOpaInstance)
 	var decisionReq oapicodegen.OPADecisionRequest
 	json.Unmarshal([]byte(jsonString), &decisionReq)
 	body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,}
@@ -665,7 +706,7 @@ func Test_decision_Result_String_with_filtered_Result(t *testing.T) {
 		return model.Active
 	defer func() { pdpstate.GetCurrentState = originalGetState }()
-	jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+	jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}`
 	var patch *monkey.PatchGuard
@@ -677,6 +718,12 @@ func Test_decision_Result_String_with_filtered_Result(t *testing.T) {
 	defer patch.Unpatch()
+	policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "mockPolicy", "policy-version": "1.0"}]}`
+	monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+        return &sdk.OPA{}, nil // Mocked OPA instance
+    })
+    defer monkey.Unpatch(getOpaInstance)
 	body := map[string]interface{}{"PolicyName": jsonString}
 	jsonBody, _ := json.Marshal(body)
 	req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
@@ -698,6 +745,52 @@ func Test_decision_Result_String_with_filtered_Result(t *testing.T) {
+// Test with OPA Decision with String type wth filtered result
+func Test_decision_with_slash_Result_String_with_filtered_Result(t *testing.T) {
+	originalGetState := pdpstate.GetCurrentState
+	pdpstate.GetCurrentState = func() model.PdpState {
+		return model.Active
+	}
+	defer func() { pdpstate.GetCurrentState = originalGetState }()
+	jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"opa/mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}`
+	var patch *monkey.PatchGuard
+	patch = monkey.PatchInstanceMethod(
+		reflect.TypeOf(&sdk.OPA{}), "Decision",
+		func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+			// Simulate an error to trigger the second error block
+			return mockDecisionResult, nil
+		},
+	)
+	defer patch.Unpatch()
+	policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "opa.mockPolicy", "policy-version": "1.0"}]}`
+	monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+        return &sdk.OPA{}, nil // Mocked OPA instance
+    })
+    defer monkey.Unpatch(getOpaInstance)
+	body := map[string]interface{}{"PolicyName": jsonString}
+	jsonBody, _ := json.Marshal(body)
+	req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+	res := httptest.NewRecorder()
+	var patch1 *monkey.PatchGuard
+	patch1 = monkey.PatchInstanceMethod(
+		reflect.TypeOf(&json.Decoder{}), "Decode",
+		func(_ *json.Decoder, v interface{}) error {
+			if req, ok := v.(*oapicodegen.OPADecisionRequest); ok {
+				*req = mockDecisionReq3
+			}
+			return nil
+		},
+	)
+	defer patch1.Unpatch()
+	OpaDecision(res, req)
+	assert.Equal(t, http.StatusOK, res.Code)
 // Test with OPA Decision with unexpected type wth filtered result
 func Test_decision_with_filtered_Result_as_unexpected_Res_Type(t *testing.T) {
 	originalGetState := pdpstate.GetCurrentState
@@ -705,7 +798,7 @@ func Test_decision_with_filtered_Result_as_unexpected_Res_Type(t *testing.T) {
 		return model.Active
 	defer func() { pdpstate.GetCurrentState = originalGetState }()
-	jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+	jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}`
 	var patch *monkey.PatchGuard
@@ -717,6 +810,12 @@ func Test_decision_with_filtered_Result_as_unexpected_Res_Type(t *testing.T) {
 	defer patch.Unpatch()
+	policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "mockPolicy", "policy-version": "1.0"}]}`
+	monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) {
+        return &sdk.OPA{}, nil // Mocked OPA instance
+    })
+    defer monkey.Unpatch(getOpaInstance)
 	body := map[string]interface{}{"PolicyName": jsonString}
 	jsonBody, _ := json.Marshal(body)
 	req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
@@ -765,3 +864,4 @@ func TestWriteErrorJSONResponse_EncodingFailure(t *testing.T) {
 	assert.Equal(t, http.StatusInternalServerError, response.StatusCode)
diff --git a/pkg/kafkacomm/handler/pdp_message_handler.go b/pkg/kafkacomm/handler/pdp_message_handler.go
index 8d1b9b4..235d56b 100644
--- a/pkg/kafkacomm/handler/pdp_message_handler.go
+++ b/pkg/kafkacomm/handler/pdp_message_handler.go
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -132,13 +132,13 @@ func PdpMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic s
 				switch opaPdpMessage.MessageType {
 				case "PDP_UPDATE":
-					err = PdpUpdateMessageHandler(message, p)
+					err = pdpUpdateMessageHandler(message, p)
 					if err != nil {
 						log.Warnf("Error processing Update Message: %v", err)
 				case "PDP_STATE_CHANGE":
-					err = PdpStateChangeMessageHandler(message, p)
+					err = pdpStateChangeMessageHandler(message, p)
 					if err != nil {
 						log.Warnf("Error processing Update Message: %v", err)
diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler.go b/pkg/kafkacomm/handler/pdp_state_change_handler.go
index 2de89ff..a2249d5 100644
--- a/pkg/kafkacomm/handler/pdp_state_change_handler.go
+++ b/pkg/kafkacomm/handler/pdp_state_change_handler.go
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@ import (
 // Processes incoming messages indicating a PDP state change.
 // This includes updating the PDP state and sending a status response when the state transitions.
-func PdpStateChangeMessageHandler(message []byte, p publisher.PdpStatusSender) error {
+func pdpStateChangeMessageHandler(message []byte, p publisher.PdpStatusSender) error {
 	var pdpStateChange model.PdpStateChange
diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go
index 5ad495d..8bddbca 100644
--- a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go
+++ b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go
@@ -95,7 +95,7 @@ func TestPdpStateChangeMessageHandler(t *testing.T) {
 			// Call the handler
-			err := PdpStateChangeMessageHandler(tt.message, mockSender)
+			err := pdpStateChangeMessageHandler(tt.message, mockSender)
 			// Check the results
 			if tt.expectError {
diff --git a/pkg/kafkacomm/handler/pdp_update_deploy_policy.go b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go
new file mode 100644
index 0000000..5c7651c
--- /dev/null
+++ b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go
@@ -0,0 +1,390 @@
+// -
+//   ========================LICENSE_START=================================
+//   Copyright (C) 2025: Deutsche Telekom
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   SPDX-License-Identifier: Apache-2.0
+//   ========================LICENSE_END===================================
+// will process the update message from pap and send the pdp status response.
+package handler
+import (
+	"bytes"
+	"context"
+	"encoding/base64"
+	"encoding/json"
+	"fmt"
+	"os"
+	"os/exec"
+	"path/filepath"
+	"policy-opa-pdp/consts"
+	"policy-opa-pdp/pkg/kafkacomm/publisher"
+	"policy-opa-pdp/pkg/log"
+	"policy-opa-pdp/pkg/metrics"
+	"policy-opa-pdp/pkg/model"
+	"policy-opa-pdp/pkg/opasdk"
+	"policy-opa-pdp/pkg/policymap"
+	"policy-opa-pdp/pkg/utils"
+	"strings"
+// stores policy and data files to directory.
+func createAndStorePolicyData(policy model.ToscaPolicy) error {
+	// Extract and decode policies
+	decodedPolicies, key, err := extractAndDecodePolicies(policy)
+	if err != nil {
+		log.Errorf("Failed to extract and decode policies for key : %v, %v", key, err)
+		return err
+	}
+	err = createPolicyDirectories(decodedPolicies)
+	if err != nil {
+		log.Errorf("Failed to create policy directories: %v", err)
+		return err
+	}
+	decodedData, key, err := extractAndDecodeData(policy)
+	if err != nil {
+		log.Errorf("Failed to extract and decode data: %v", err)
+		return err
+	}
+	err = createDataDirectories(decodedData)
+	if err != nil {
+		log.Errorf("Failed to create data directories: %v", err)
+		return err
+	}
+	return nil
+// Function to create directories and save policies
+func createPolicyDirectories(decodedPolicies map[string]string) error {
+	for key, decodedPolicy := range decodedPolicies {
+		policyDir := filepath.Join(basePolicyDir, filepath.Join(strings.Split(key, ".")...))
+		err := utils.CreateDirectory(policyDir)
+		if err != nil {
+			log.Errorf("Failed to create policy directory %s: %v", policyDir, err)
+			return err
+		}
+		err = os.WriteFile(filepath.Join(policyDir, "policy.rego"), []byte(decodedPolicy), os.ModePerm)
+		if err != nil {
+			log.Errorf("Failed to save policy.rego for %s: %v", key, err)
+			return err
+		}
+		log.Infof("Policy file saved: %s", filepath.Join(policyDir, "policy.rego"))
+	}
+	return nil
+// Function to create directories and save data
+func createDataDirectories(decodedData map[string]string) error {
+	for key, dataContent := range decodedData {
+		dataDir := filepath.Join(baseDataDir, filepath.Join(strings.Split(key, ".")...))
+		err := utils.CreateDirectory(dataDir)
+		if err != nil {
+			log.Errorf("Failed to create data directory %s: %v", dataDir, err)
+			return err
+		}
+		err = os.WriteFile(filepath.Join(dataDir, "data.json"), []byte(dataContent), os.ModePerm)
+		if err != nil {
+			log.Errorf("Failed to save data.json for %s: %v", key, err)
+			return err
+		}
+		log.Infof("Data file saved: %s", filepath.Join(dataDir, "data.json"))
+	}
+	return nil
+// Extract and decodes Policies from PDP_UPDATE message using Base64Decode
+func extractAndDecodePolicies(policy model.ToscaPolicy) (map[string]string, []string, error) {
+	decodedPolicies := make(map[string]string)
+	var keys []string
+	for key, encodedPolicy := range policy.Properties.Policy {
+		decodedPolicy, err := base64.StdEncoding.DecodeString(encodedPolicy)
+		if err != nil {
+			log.Errorf("Failed to decode policy for key: %v, %v", key, err)
+			return nil, nil, err
+		}
+		decodedPolicies[key] = string(decodedPolicy)
+		keys = append(keys, key)
+		log.Tracef("Decoded policy content: %s", decodedPolicy)
+		// Validate package name
+		if err := validatePackageName(key, string(decodedPolicy)); err != nil {
+			log.Errorf("Validation for Policy: %v failed, %v", key, err)
+			return nil, nil, err
+		}
+		log.Debugf("Decoded policy content for key '%s': %s", key, decodedPolicy)
+	}
+	return decodedPolicies, keys, nil
+// Validate the package name extracted from the decoded policy against the key
+func validatePackageName(key, decodedPolicyContent string) error {
+	// Extract the package name from the first line of the decoded policy content
+	lines := strings.Split(decodedPolicyContent, "\n")
+	if len(lines) == 0 {
+		return fmt.Errorf("no content found in decoded policy for key '%s'", key)
+	}
+	// Assume the first line contains the package declaration
+	packageLine := strings.TrimSpace(lines[0])
+	if !strings.HasPrefix(packageLine, "package ") {
+		return fmt.Errorf("package declaration not found in policy content for key '%s'", key)
+	}
+	// Extract the actual package name
+	packageName := strings.TrimSpace(strings.TrimPrefix(packageLine, "package "))
+	expectedPackageName := key
+	// Compare the extracted package name with the expected package name
+	if packageName != expectedPackageName {
+		return fmt.Errorf("package name mismatch for key '%s': expected '%s' but got '%s'", key, expectedPackageName, packageName)
+	}
+	return nil
+// Extract and decodes Data from PDP_UPDATE message using Base64Decode
+func extractAndDecodeData(policy model.ToscaPolicy) (map[string]string, []string, error) {
+	decodedData := make(map[string]string)
+	var keys []string
+	for key, encodedData := range policy.Properties.Data {
+		decodedContent, err := base64.StdEncoding.DecodeString(encodedData)
+		if err != nil {
+			log.Errorf("Failed to decode data for key: %v, %v", key, err)
+			return nil, nil, err
+		}
+		decodedData[key] = string(decodedContent)
+		keys = append(keys, key)
+		log.Tracef("Decoded data content: %s", decodedContent)
+	}
+	return decodedData, keys, nil
+// Function to extract folder name based on policy
+func getDirName(policy model.ToscaPolicy) []string {
+	// Split the policy name to identify the folder part (i.e., the first part before ".")
+	var dirNames []string
+	for key, _ := range policy.Properties.Data {
+		dirNames = append(dirNames, strings.ReplaceAll(consts.Data+"/"+key, ".", "/"))
+	}
+	for key, _ := range policy.Properties.Policy {
+		dirNames = append(dirNames, strings.ReplaceAll(consts.Policies+"/"+key, ".", "/"))
+	}
+	return dirNames
+// upsert policy to sdk.
+func upsertPolicy(policy model.ToscaPolicy) error {
+	decodedContent, keys, _ := extractAndDecodePolicies(policy)
+	for _, key := range keys {
+		policyContent := decodedContent[key]
+		err := opasdk.UpsertPolicy(context.Background(), key, []byte(policyContent))
+		if err != nil {
+			log.Errorf("Failed to Insert Policy %v", err)
+			return err
+		}
+	}
+	return nil
+// handles writing data to sdk.
+func upsertData(policy model.ToscaPolicy) error {
+	decodedDataContent, dataKeys, _ := extractAndDecodeData(policy)
+	for _, dataKey := range dataKeys {
+		dataContent := decodedDataContent[dataKey]
+		reader := bytes.NewReader([]byte(dataContent))
+		decoder := json.NewDecoder(reader)
+		decoder.UseNumber()
+		var wdata map[string]interface{}
+		err := decoder.Decode(&wdata)
+		if err != nil {
+			log.Errorf("Failed to Insert Data: %s: %v", policy.Name, err)
+			return err
+		}
+		keypath := "/" + strings.Replace(dataKey, ".", "/", -1)
+		err = opasdk.WriteData(context.Background(), keypath, wdata)
+		if err != nil {
+			log.Errorf("Failed to Write Data: %s: %v", policy.Name, err)
+			return err
+		}
+	}
+	return nil
+// handles policy deployment
+func handlePolicyDeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string) {
+	var failureMessages []string
+	successPolicies := make(map[string]string)
+	// Check if policy is deployed previously
+	pdpUpdate.PoliciesToBeDeployed = checkIfPolicyAlreadyDeployed(pdpUpdate)
+	for _, policy := range pdpUpdate.PoliciesToBeDeployed {
+		// Validate the policy
+		policyAllowed, err := validateParentPolicy(policy)
+		if err != nil {
+			log.Warnf("Tosca Policy Id validation failed for policy nameas it is a parent folder:%s, %v", policy.Name, err)
+			failureMessages = append(failureMessages, fmt.Sprintf("%s, %v", policy.Name, err))
+			metrics.IncrementDeployFailureCount()
+			metrics.IncrementTotalErrorCount()
+			continue
+		}
+		if policyAllowed {
+			log.Debugf("Policy Is Allowed: %s", policy.Name)
+		}
+		if err := utils.ValidateToscaPolicyJsonFields(policy); err != nil {
+			log.Debugf("Tosca Policy Validation Failed for policy Name: %s, %v", policy.Name, err)
+			failureMessages = append(failureMessages, fmt.Sprintf("Tosca Policy Validation failed for Policy: %s: %v", policy.Name, err))
+			metrics.IncrementDeployFailureCount()
+			metrics.IncrementTotalErrorCount()
+			continue
+		}
+		// Create and store policy data
+		if err := createAndStorePolicyData(policy); err != nil {
+			failureMessages = append(failureMessages, fmt.Sprintf("%s: %v", policy.Name, err))
+			metrics.IncrementDeployFailureCount()
+			metrics.IncrementTotalErrorCount()
+			continue
+		}
+		// Build the bundle
+		if err := verifyPolicyByBundleCreation(policy); err != nil {
+			failureMessages = append(failureMessages, fmt.Sprintf("Failed to build Rego File for %s: %v", policy.Name, err))
+			metrics.IncrementDeployFailureCount()
+			metrics.IncrementTotalErrorCount()
+			continue
+		}
+		// Upsert policy and data
+		if err := upsertPolicyAndData(policy, successPolicies); err != nil {
+			failureMessages = append(failureMessages, err.Error())
+			metrics.IncrementDeployFailureCount()
+			metrics.IncrementTotalErrorCount()
+			continue
+		} else {
+			successPolicies[policy.Name] = policy.Version
+			if _, err := policymap.UpdateDeployedPoliciesinMap(policy); err != nil {
+				log.Warnf("Failed to store policy data map after deploying policy %s: %v", policy.Name, err)
+			}
+		}
+		metrics.IncrementDeploySuccessCount()
+		log.Debugf("Loaded Policy: %s", policy.Name)
+	}
+	totalPolicies := policymap.GetTotalDeployedPoliciesCountFromMap()
+	metrics.SetTotalPoliciesCount(int64(totalPolicies))
+	return failureMessages, successPolicies
+// checks if policy exists in the map.
+func checkIfPolicyAlreadyDeployed(pdpUpdate model.PdpUpdate) []model.ToscaPolicy {
+	if len(policymap.LastDeployedPolicies) > 0 {
+		log.Debugf("Check if Policy is Already Deployed: %v", policymap.LastDeployedPolicies)
+		return policymap.VerifyAndReturnPoliciesToBeDeployed(policymap.LastDeployedPolicies, pdpUpdate)
+	}
+	return pdpUpdate.PoliciesToBeDeployed
+// verfies policy by creating bundle.
+func verifyPolicyByBundleCreation(policy model.ToscaPolicy) error {
+	// get directory name
+	dirNames := getDirName(policy)
+	if len(dirNames) == 0 {
+		log.Warnf("Unable to extract folder name from policy %s", policy.Name)
+		return fmt.Errorf("failed to extract folder name")
+	}
+	// create bundle
+	output, err := createBundleFunc(exec.Command, policy)
+	if err != nil {
+		log.Warnf("Failed to initialize bundle for %s: %s", policy.Name, string(output))
+		for _, dirPath := range dirNames {
+			if removeErr := utils.RemoveDirectory(dirPath); removeErr != nil {
+				log.Errorf("Error removing directory for policy %s: %v", policy.Name, removeErr)
+			}
+		}
+		log.Debugf("Directory cleanup as bundle creation failed")
+		return fmt.Errorf("failed to build bundle: %v", err)
+	}
+	return nil
+// handles Upsert func for policy and data
+func upsertPolicyAndData(policy model.ToscaPolicy, successPolicies map[string]string) error {
+	if err := upsertPolicy(policy); err != nil {
+		log.Warnf("Failed to upsert policy: %v", err)
+		return fmt.Errorf("Failed to Insert Policy: %s: %v", policy.Name, err)
+	}
+	if err := upsertData(policy); err != nil {
+		return fmt.Errorf("Failed to Write Data: %s: %v", policy.Name, err)
+	}
+	return nil
+// validates whether new policy is parent of the existing policy
+func validateParentPolicy(policy model.ToscaPolicy) (bool, error) {
+	policiesmap, err := policymap.UnmarshalLastDeployedPolicies(policymap.LastDeployedPolicies)
+	if err != nil {
+		log.Warnf("Failed to extract deployed policies: %v", err)
+		return false, err
+	}
+	policyAllowed, err := utils.IsPolicyNameAllowed(policy, policiesmap)
+	if err != nil {
+		log.Warnf("Tosca Policy Id validation failed for policy nameas it is a parent folder:%s, %v", policy.Name, err)
+		return false, err
+	}
+	return policyAllowed, nil
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler.go b/pkg/kafkacomm/handler/pdp_update_message_handler.go
index efe115c..148e5b5 100644
--- a/pkg/kafkacomm/handler/pdp_update_message_handler.go
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler.go
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -21,30 +21,49 @@ package handler
 import (
-	""
+	"fmt"
+	"os/exec"
+	"policy-opa-pdp/consts"
+	"policy-opa-pdp/pkg/bundleserver"
+	"policy-opa-pdp/pkg/policymap"
+	"policy-opa-pdp/pkg/utils"
+	"strings"
+var (
+	basePolicyDir = consts.Policies
+	baseDataDir   = consts.Data
 // Handles messages of type PDP_UPDATE sent from the Policy Administration Point (PAP).
 // It validates the incoming data, updates PDP attributes, and sends a response back to the sender.
-func PdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error {
+func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error {
+	var failureMessages []string
 	var pdpUpdate model.PdpUpdate
+	var loggingPoliciesList string
 	err := json.Unmarshal(message, &pdpUpdate)
 	if err != nil {
 		log.Debugf("Failed to UnMarshal Messages: %v\n", err)
+		resMessage := fmt.Errorf("PDP Update Failed: %v", err)
+		if err := sendFailureResponse(p, &pdpUpdate, resMessage); err != nil {
+			log.Debugf("Failed to send update error response: %v", err)
+			return err
+		}
 		return err
-	//Initialize Validator and validate Struct after unmarshalling
-	validate := validator.New()
-	err = validate.Struct(pdpUpdate)
+	//Initialize Validator and validate Struct after unmarshalling
+	err = utils.ValidateFieldsStructs(pdpUpdate)
 	if err != nil {
-		for _, err := range err.(validator.ValidationErrors) {
-			log.Infof("Field %s failed on the %s tag\n", err.Field(), err.Tag())
+		resMessage := fmt.Errorf("PDP Update Failed: %v", err)
+		if err := sendFailureResponse(p, &pdpUpdate, resMessage); err != nil {
+			log.Debugf("Failed to send update error response: %v", err)
+			return err
 		return err
@@ -54,12 +73,112 @@ func PdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error
-	err = publisher.SendPdpUpdateResponse(p, &pdpUpdate)
-	if err != nil {
-		log.Debugf("Failed to Send Update Response Message: %v\n", err)
-		return err
+	if len(pdpUpdate.PoliciesToBeDeployed) > 0 {
+		failureMessage, successfullyDeployedPolicies := handlePolicyDeployment(pdpUpdate, p)
+		mapJson, err := policymap.FormatMapofAnyType(successfullyDeployedPolicies)
+		if len(failureMessage) > 0 {
+			failureMessages = append(failureMessages, "{Deployment Errors:"+strings.Join(failureMessage, "")+"}")
+		}
+		if err != nil {
+			failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|")
+			resMessage := fmt.Errorf("PDP Update Failed: failed to format successfullyDeployedPolicies json %v", failureMessages)
+			if err = sendFailureResponse(p, &pdpUpdate, resMessage); err != nil {
+				log.Debugf("Failed to send update error response: %v", err)
+				return err
+			}
+		}
+		loggingPoliciesList = mapJson
+	}
+	// Check if "PoliciesToBeUndeployed" is empty or not
+	if len(pdpUpdate.PoliciesToBeUndeployed) > 0 {
+		log.Infof("Found Policies to be undeployed")
+		failureMessage, successfullyUndeployedPolicies := handlePolicyUndeployment(pdpUpdate, p)
+		mapJson, err := policymap.FormatMapofAnyType(successfullyUndeployedPolicies)
+		if len(failureMessage) > 0 {
+			failureMessages = append(failureMessages, "{UnDeployment Errors:"+strings.Join(failureMessage, "")+"}")
+		}
+		if err != nil {
+			failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|")
+			resMessage := fmt.Errorf("PDP Update Failed: failed to format successfullyUnDeployedPolicies json %v", failureMessages)
+			if err = sendFailureResponse(p, &pdpUpdate, resMessage); err != nil {
+				log.Debugf("Failed to send update error response: %v", err)
+				return err
+			}
+		}
+		loggingPoliciesList = mapJson
+	}
+	if len(pdpUpdate.PoliciesToBeDeployed) == 0 && len(pdpUpdate.PoliciesToBeUndeployed) == 0 {
+		//Response for PAP Registration
+		err = sendSuccessResponse(p, &pdpUpdate, "PDP UPDATE is successfull")
+		if err != nil {
+			log.Debugf("Failed to Send Update Response Message: %v\n", err)
+			return err
+		}
+	} else {
+		//Send Response for Deployment or Undeployment or when both deployment and undeployment comes together
+		if err := sendPDPStatusResponse(pdpUpdate, p, loggingPoliciesList, failureMessages); err != nil {
+			return err
+		}
 	log.Infof("PDP_STATUS Message Sent Successfully")
 	go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
 	return nil
+// build bundle tar file
+func createBundleFunc(execCmd func(string, ...string) *exec.Cmd, toscaPolicy model.ToscaPolicy) (string, error) {
+	return bundleserver.BuildBundle(execCmd)
+func sendSuccessResponse(p publisher.PdpStatusSender, pdpUpdate *model.PdpUpdate, respMessage string) error {
+	if err := publisher.SendPdpUpdateResponse(p, pdpUpdate, respMessage); err != nil {
+		return err
+	}
+	return nil
+func sendFailureResponse(p publisher.PdpStatusSender, pdpUpdate *model.PdpUpdate, respMessage error) error {
+	if err := publisher.SendPdpUpdateErrorResponse(p, pdpUpdate, respMessage); err != nil {
+		return err
+	}
+	return nil
+func sendPDPStatusResponse(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender, loggingPoliciesList string, failureMessages []string) error {
+	if len(failureMessages) > 0 {
+		resMessage := fmt.Errorf("PDP Update Failed: %v", failureMessages)
+		if err := sendFailureResponse(p, &pdpUpdate, resMessage); err != nil {
+			log.Warnf("Failed to send update error response: %v", err)
+			return err
+		}
+	} else {
+		if len(pdpUpdate.PoliciesToBeUndeployed) == 0 {
+			resMessage := fmt.Sprintf("PDP Update Successful for all policies: %v", loggingPoliciesList)
+			if err := sendSuccessResponse(p, &pdpUpdate, resMessage); err != nil {
+				log.Warnf("Failed to send update response: %v", err)
+				return err
+			}
+			log.Infof("Processed policies_to_be_deployed successfully")
+		} else if len(pdpUpdate.PoliciesToBeDeployed) == 0 {
+			resMessage := fmt.Sprintf("PDP Update Policies undeployed :%v", loggingPoliciesList)
+			if err := sendSuccessResponse(p, &pdpUpdate, resMessage); err != nil {
+				log.Warnf("Failed to Send Update Response Message: %v", err)
+				return err
+			}
+			log.Infof("Processed policies_to_be_undeployed successfully")
+		} else {
+			if err := sendSuccessResponse(p, &pdpUpdate, "PDP UPDATE is successfull"); err != nil {
+				log.Warnf("Failed to Send Update Response Message: %v", err)
+				return err
+			}
+		}
+	}
+	return nil
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
index 4d5d7dc..d29c814 100644
--- a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
@@ -1,5 +1,5 @@
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -22,7 +22,9 @@ import (
+	"policy-opa-pdp/consts"
+	"policy-opa-pdp/pkg/policymap"
@@ -50,7 +52,7 @@ func TestPdpUpdateMessageHandler_Success(t *testing.T) {
 	mockSender := new(mocks.PdpStatusSender)
 	mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
-	err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+	err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
 	assert.NoError(t, err)
@@ -70,7 +72,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure1(t *testing.T) {
 	mockSender := new(mocks.PdpStatusSender)
 	mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
-	err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+	err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
 	assert.Error(t, err)
@@ -91,7 +93,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure2(t *testing.T) {
 	mockSender := new(mocks.PdpStatusSender)
 	mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
-	err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+	err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
 	assert.Error(t, err)
@@ -112,7 +114,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure3(t *testing.T) {
 	mockSender := new(mocks.PdpStatusSender)
 	mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
-	err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+	err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
 	assert.Error(t, err)
@@ -131,7 +133,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure4(t *testing.T) {
 	mockSender := new(mocks.PdpStatusSender)
 	mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
-	err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+	err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
 	assert.Error(t, err)
@@ -160,7 +162,7 @@ func TestPdpUpdateMessageHandler_Fails_Sending_UpdateResponse(t *testing.T) {
 	mockSender := new(mocks.PdpStatusSender)
 	mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Error in Sending PDP Update Response"))
-	err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+	err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
 	assert.Error(t, err)
@@ -190,7 +192,63 @@ func TestPdpUpdateMessageHandler_Invalid_Starttimeinterval(t *testing.T) {
 	mockSender := new(mocks.PdpStatusSender)
 	mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Invalid Interval Time for Heartbeat"))
-	err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+	err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
 	assert.Error(t, err)
+Description: Test by sending a valid input with policies to be deployed
+Input: valid input with PoliciesToBeDeployed
+Expected Output: Policies should be deployed successfully and corresponding messages sent.
+func TestPdpUpdateMessageHandler_Successful_Deployment(t *testing.T) {
+	messageString := `{
+		"source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+		"pdpHeartbeatIntervalMs":120000,
+		"policiesToBeDeployed": [{"type": "onap.policies.native.opa","type_version": "1.0.0","properties": {"data": {"zone": "ewogICJ6b25lIjogewogICAgInpvbmVfYWNjZXNzX2xvZ3MiOiBbCiAgICAgIHsgImxvZ19pZCI6ICJsb2cxIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDA5OjAwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJncmFudGVkIiwgInVzZXIiOiAidXNlcjEiIH0sCiAgICAgIHsgImxvZ19pZCI6ICJsb2cyIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDEwOjMwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJkZW5pZWQiLCAidXNlciI6ICJ1c2VyMiIgfSwKICAgICAgeyAibG9nX2lkIjogImxvZzMiLCAidGltZXN0YW1wIjogIjIwMjQtMTEtMDFUMTE6MDA6MDBaIiwgInpvbmVfaWQiOiAiem9uZUIiLCAiYWNjZXNzIjogImdyYW50ZWQiLCAidXNlciI6ICJ1c2VyMyIgfQogICAgXQogIH0KfQo="},"policy": {"zone": "cGFja2FnZSB6b25lCgppbXBvcnQgcmVnby52MQoKZGVmYXVsdCBhbGxvdyA6PSBmYWxzZQoKYWxsb3cgaWYgewogICAgaGFzX3pvbmVfYWNjZXNzCiAgICBhY3Rpb25faXNfbG9nX3ZpZXcKfQoKYWN0aW9uX2lzX2xvZ192aWV3IGlmIHsKICAgICJ2aWV3IiBpbiBpbnB1dC5hY3Rpb25zCn0KCmhhc196b25lX2FjY2VzcyBjb250YWlucyBhY2Nlc3NfZGF0YSBpZiB7CiAgICBzb21lIHpvbmVfZGF0YSBpbiBkYXRhLnpvbmUuem9uZS56b25lX2FjY2Vzc19sb2dzCiAgICB6b25lX2RhdGEudGltZXN0YW1wID49IGlucHV0LnRpbWVfcGVyaW9kLmZyb20KICAgIHpvbmVfZGF0YS50aW1lc3RhbXAgPCBpbnB1dC50aW1lX3BlcmlvZC50bwogICAgem9uZV9kYXRhLnpvbmVfaWQgPT0gaW5wdXQuem9uZV9pZAogICAgYWNjZXNzX2RhdGEgOj0ge2RhdGF0eXBlOiB6b25lX2RhdGFbZGF0YXR5cGVdIHwgZGF0YXR5cGUgaW4gaW5wdXQuZGF0YXR5cGVzfQp9Cg=="}},"name": "zone","version": "1.0.0","metadata": {"policy-id": "zone","policy-version": "1.0.0"}}],
+		"policiesToBeUndeployed":[],
+		"messageName":"PDP_UPDATE",
+		"requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+		"timestampMs":1730722305297,
+		"name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059",
+		"pdpGroup":"opaGroup",
+		"pdpSubgroup":"opa"
+	}`
+	mockSender := new(mocks.PdpStatusSender)
+	mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+	consts.Data ="/tmp/data"
+	consts.Policies ="/tmp/policies"
+	err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
+	assert.NoError(t, err)
+Description: Test by sending a valid input with policies to be deployed where the policy is already deployed
+Input: valid input with PoliciesToBeDeployed
+Expected Output: Policies should be skipping deployment since it is already present.
+func TestPdpUpdateMessageHandler_Skipping_Deployment(t *testing.T) {
+	messageString := `{
+		"source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+		"pdpHeartbeatIntervalMs":120000,
+		"policiesToBeDeployed": [{"type": "onap.policies.native.opa","type_version": "1.0.0","properties": {"data": {"zone": "ewogICJ6b25lIjogewogICAgInpvbmVfYWNjZXNzX2xvZ3MiOiBbCiAgICAgIHsgImxvZ19pZCI6ICJsb2cxIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDA5OjAwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJncmFudGVkIiwgInVzZXIiOiAidXNlcjEiIH0sCiAgICAgIHsgImxvZ19pZCI6ICJsb2cyIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDEwOjMwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJkZW5pZWQiLCAidXNlciI6ICJ1c2VyMiIgfSwKICAgICAgeyAibG9nX2lkIjogImxvZzMiLCAidGltZXN0YW1wIjogIjIwMjQtMTEtMDFUMTE6MDA6MDBaIiwgInpvbmVfaWQiOiAiem9uZUIiLCAiYWNjZXNzIjogImdyYW50ZWQiLCAidXNlciI6ICJ1c2VyMyIgfQogICAgXQogIH0KfQo="},"policy": {"zone": "cGFja2FnZSB6b25lCgppbXBvcnQgcmVnby52MQoKZGVmYXVsdCBhbGxvdyA6PSBmYWxzZQoKYWxsb3cgaWYgewogICAgaGFzX3pvbmVfYWNjZXNzCiAgICBhY3Rpb25faXNfbG9nX3ZpZXcKfQoKYWN0aW9uX2lzX2xvZ192aWV3IGlmIHsKICAgICJ2aWV3IiBpbiBpbnB1dC5hY3Rpb25zCn0KCmhhc196b25lX2FjY2VzcyBjb250YWlucyBhY2Nlc3NfZGF0YSBpZiB7CiAgICBzb21lIHpvbmVfZGF0YSBpbiBkYXRhLnpvbmUuem9uZS56b25lX2FjY2Vzc19sb2dzCiAgICB6b25lX2RhdGEudGltZXN0YW1wID49IGlucHV0LnRpbWVfcGVyaW9kLmZyb20KICAgIHpvbmVfZGF0YS50aW1lc3RhbXAgPCBpbnB1dC50aW1lX3BlcmlvZC50bwogICAgem9uZV9kYXRhLnpvbmVfaWQgPT0gaW5wdXQuem9uZV9pZAogICAgYWNjZXNzX2RhdGEgOj0ge2RhdGF0eXBlOiB6b25lX2RhdGFbZGF0YXR5cGVdIHwgZGF0YXR5cGUgaW4gaW5wdXQuZGF0YXR5cGVzfQp9Cg=="}},"name": "zone","version": "1.0.0","metadata": {"policy-id": "zone","policy-version": "1.0.0"}}],
+		"policiesToBeUndeployed":[],
+		"messageName":"PDP_UPDATE",
+		"requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+		"timestampMs":1730722305297,
+		"name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059",
+		"pdpGroup":"opaGroup",
+		"pdpSubgroup":"opa"
+	}`
+	policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"data": ["zone"],"policy": ["zone"],"policy-id": "zone","policy-version": "1.0.0"}]}`
+	mockSender := new(mocks.PdpStatusSender)
+	mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+        err := pdpUpdateMessageHandler([]byte(messageString), mockSender)
+	assert.NoError(t, err)
diff --git a/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go b/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
new file mode 100644
index 0000000..9770d88
--- /dev/null
+++ b/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go
@@ -0,0 +1,196 @@
+// -
+//   ========================LICENSE_START=================================
+//   Copyright (C) 2025: Deutsche Telekom
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   SPDX-License-Identifier: Apache-2.0
+//   ========================LICENSE_END===================================
+// will process the update message from pap and send the pdp status response.
+package handler
+import (
+	"context"
+	"fmt"
+	"path/filepath"
+	"policy-opa-pdp/consts"
+	"policy-opa-pdp/pkg/kafkacomm/publisher"
+	"policy-opa-pdp/pkg/log"
+	"policy-opa-pdp/pkg/metrics"
+	"policy-opa-pdp/pkg/model"
+	"policy-opa-pdp/pkg/opasdk"
+	"policy-opa-pdp/pkg/policymap"
+	"policy-opa-pdp/pkg/utils"
+	"strings"
+// processPoliciesTobeUndeployed handles the undeployment of policies
+func processPoliciesTobeUndeployed(undeployedPolicies map[string]string) ([]string, map[string]string) {
+	var failureMessages []string
+	successfullyUndeployedPolicies := make(map[string]string)
+	// Unmarshal the last known policies
+	deployedPolicies, err := policymap.UnmarshalLastDeployedPolicies(policymap.LastDeployedPolicies)
+	if err != nil {
+		log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+	}
+	for policyID, policyVersion := range undeployedPolicies {
+		// Check if undeployed policy exists in deployedPolicies
+		matchedPolicy := findDeployedPolicy(policyID, policyVersion, deployedPolicies)
+		if matchedPolicy != nil {
+			// Handle undeployment for the policy
+			errs := policyUndeploymentAction(matchedPolicy)
+			if len(errs) > 0 {
+				metrics.IncrementUndeployFailureCount()
+				metrics.IncrementTotalErrorCount()
+				failureMessages = append(failureMessages, errs...)
+			}
+			deployedPoliciesMap, err := policymap.RemoveUndeployedPoliciesfromMap(matchedPolicy)
+			if err != nil {
+				log.Warnf("Policy Name: %s, Version: %s is not removed from LastDeployedPolicies", policyID, policyVersion)
+				failureMessages = append(failureMessages, "Error in removing from LastDeployedPolicies")
+			}
+			log.Debugf("Policies Map After Undeployment : %s", deployedPoliciesMap)
+			metrics.IncrementUndeploySuccessCount()
+			successfullyUndeployedPolicies[policyID] = policyVersion
+		} else {
+			// Log failure if no match is found
+			log.Debugf("Policy Name: %s, Version: %s is marked for undeployment but was not deployed", policyID, policyVersion)
+			continue
+		}
+	}
+        totalPolicies := policymap.GetTotalDeployedPoliciesCountFromMap()
+        metrics.SetTotalPoliciesCount(int64(totalPolicies))
+	return failureMessages, successfullyUndeployedPolicies
+func handlePolicyUndeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string) {
+	// Extract undeployed policies into a dictionary
+	undeployedPoliciesDict := extractUndeployedPolicies(pdpUpdate.PoliciesToBeUndeployed)
+	// Process undeployment actions
+	errorMessages, successfullyUndeployedPolicies := processPoliciesTobeUndeployed(undeployedPoliciesDict)
+	return errorMessages, successfullyUndeployedPolicies
+// ExtractUndeployedPolicies extracts policy names and versions into a map
+func extractUndeployedPolicies(policies []model.ToscaConceptIdentifier) map[string]string {
+	undeployedPoliciesDict := make(map[string]string)
+	for _, policy := range policies {
+		undeployedPoliciesDict[policy.Name] = policy.Version
+		log.Infof("Extracted Policy Name: %s, Version: %s for undeployment", policy.Name, policy.Version)
+	}
+	return undeployedPoliciesDict
+// HandlePolicyUndeployment processes the actual undeployment actions for a policy
+func policyUndeploymentAction(policy map[string]interface{}) []string {
+	var failureMessages []string
+	// Delete "policy" sdk and directories
+	policyErrors := removePolicyFromSdkandDir(policy)
+	failureMessages = append(failureMessages, policyErrors...)
+	// Delete "data" sdk and directories
+	dataErrors := removeDataFromSdkandDir(policy)
+	failureMessages = append(failureMessages, dataErrors...)
+	return failureMessages
+// removeDataFromSdkandDir handles the "data" directories in the policy
+func removeDataFromSdkandDir(policy map[string]interface{}) []string {
+	var failureMessages []string
+	if dataKeys, ok := policy["data"].([]interface{}); ok {
+		for _, dataKey := range dataKeys {
+			keyPath := "/" + strings.Replace(dataKey.(string), ".", "/", -1)
+			log.Debugf("Deleting data from OPA at keypath: %s", keyPath)
+			if err := opasdk.DeleteData(context.Background(), keyPath); err != nil {
+				failureMessages = append(failureMessages, err.Error())
+				continue
+			}
+			if err := removeDataDirectory(keyPath); err != nil {
+				failureMessages = append(failureMessages, err.Error())
+			}
+		}
+	} else {
+		failureMessages = append(failureMessages, fmt.Sprintf("%s:%s Invalid JSON structure: 'data' is missing or not an array", policy["policy-id"], policy["policy-version"]))
+	}
+	return failureMessages
+// removePolicyFromSdkandDir handles the "policy" directories in the policy
+func removePolicyFromSdkandDir(policy map[string]interface{}) []string {
+	var failureMessages []string
+	if policyKeys, ok := policy["policy"].([]interface{}); ok {
+		for _, policyKey := range policyKeys {
+			keyPath := "/" + strings.Replace(policyKey.(string), ".", "/", -1)
+			if err := opasdk.DeletePolicy(context.Background(), policyKey.(string)); err != nil {
+				failureMessages = append(failureMessages, err.Error())
+				continue
+			}
+			if err := removePolicyDirectory(keyPath); err != nil {
+				failureMessages = append(failureMessages, err.Error())
+			}
+		}
+	} else {
+		failureMessages = append(failureMessages, fmt.Sprintf("%s:%s Invalid JSON structure: 'policy' is missing or not an array", policy["policy-id"], policy["policy-version"]))
+	}
+	return failureMessages
+// RemoveDataDirectory removes a directory for data
+func removeDataDirectory(dataKey string) error {
+	dataPath := filepath.Join(consts.Data, dataKey)
+	log.Debugf("Removing data directory: %s", dataPath)
+	if err := utils.RemoveDirectory(dataPath); err != nil {
+		return fmt.Errorf("Failed to handle directory for data %s: %v", dataPath, err)
+	}
+	return nil
+// RemovePolicyDirectory removes a directory for policies
+func removePolicyDirectory(policyKey string) error {
+	policyPath := filepath.Join(consts.Policies, policyKey)
+	log.Debugf("Removing policy directory: %s", policyPath)
+	if err := utils.RemoveDirectory(policyPath); err != nil {
+		return fmt.Errorf("Failed to handle directory for policy %s: %v", policyPath, err)
+	}
+	return nil
+// findDeployedPolicy searches for a policy in deployedPolicies
+func findDeployedPolicy(policyID, policyVersion string, deployedPolicies []map[string]interface{}) map[string]interface{} {
+	for _, policy := range deployedPolicies {
+		// Extract policy-id and policy-version from the deployed policy
+		id, idOk := policy["policy-id"].(string)
+		version, versionOk := policy["policy-version"].(string)
+		// Check if the deployed policy matches the undeployed policy
+		if idOk && versionOk && id == policyID && version == policyVersion {
+			return policy
+		}
+	}
+	return nil
diff --git a/pkg/kafkacomm/pdp_topic_consumer.go b/pkg/kafkacomm/pdp_topic_consumer.go
index 3d19e6c..ce7fc4c 100644
--- a/pkg/kafkacomm/pdp_topic_consumer.go
+++ b/pkg/kafkacomm/pdp_topic_consumer.go
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -93,6 +93,9 @@ func NewKafkaConsumer() (*KafkaConsumer, error) {
 			configMap.SetKey("sasl.username", username)
 			configMap.SetKey("sasl.password", password)
 			configMap.SetKey("security.protocol", "SASL_PLAINTEXT")
+			configMap.SetKey("fetch.max.bytes", 50*1024*1024)
+			configMap.SetKey("max.partition.fetch.bytes",50*1024*1024)
+                        configMap.SetKey("socket.receive.buffer.bytes", 50*1024*1024)
 			configMap.SetKey("", "30000")
 			configMap.SetKey("", "300000")
 			configMap.SetKey("enable.partition.eof", true)
diff --git a/pkg/kafkacomm/pdp_topic_consumer_test.go b/pkg/kafkacomm/pdp_topic_consumer_test.go
index 1518474..eb2db03 100644
--- a/pkg/kafkacomm/pdp_topic_consumer_test.go
+++ b/pkg/kafkacomm/pdp_topic_consumer_test.go
@@ -20,16 +20,16 @@
 package kafkacomm
 import (
+	""
-	"policy-opa-pdp/pkg/kafkacomm/mocks"
-	"testing"
-	"sync"
-        "fmt"
+	"fmt"
-	""
+	"policy-opa-pdp/pkg/kafkacomm/mocks"
+	"sync"
+	"testing"
 var kafkaConsumerFactory = kafka.NewConsumer
@@ -175,34 +175,34 @@ func TestKafkaConsumer_Unsubscribe_Nil_Error(t *testing.T) {
-//Helper function to reset
+// Helper function to reset
 func resetKafkaConsumerSingleton() {
-        consumerOnce = sync.Once{}
-        consumerInstance = nil
+	consumerOnce = sync.Once{}
+	consumerInstance = nil
-//Test for mock error creating consumers
+// Test for mock error creating consumers
 func TestNewKafkaConsumer_ErrorCreatingConsumer(t *testing.T) {
-        resetKafkaConsumerSingleton()
-        monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) {
-                return nil, fmt.Errorf("mock error creating consumer")
-        })
-        defer monkey.Unpatch(kafka.NewConsumer)
-        consumer, err := NewKafkaConsumer()
-        assert.Nil(t, consumer)
-        assert.EqualError(t, err, "Kafka Consumer instance not created")
+	resetKafkaConsumerSingleton()
+	monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) {
+		return nil, fmt.Errorf("mock error creating consumer")
+	})
+	defer monkey.Unpatch(kafka.NewConsumer)
+	consumer, err := NewKafkaConsumer()
+	assert.Nil(t, consumer)
+	assert.EqualError(t, err, "Kafka Consumer instance not created")
 // Test for error creating kafka instance
 func TestNewKafkaConsumer_NilConsumer(t *testing.T) {
-        resetKafkaConsumerSingleton()
-        monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) {
-                return nil, nil
-        })
-        defer monkey.Unpatch(kafka.NewConsumer)
-        consumer, err := NewKafkaConsumer()
-        assert.Nil(t, consumer)
-        assert.EqualError(t, err, "Kafka Consumer instance not created")
+	resetKafkaConsumerSingleton()
+	monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) {
+		return nil, nil
+	})
+	defer monkey.Unpatch(kafka.NewConsumer)
+	consumer, err := NewKafkaConsumer()
+	assert.Nil(t, consumer)
+	assert.EqualError(t, err, "Kafka Consumer instance not created")
diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat.go b/pkg/kafkacomm/publisher/pdp-heartbeat.go
index 42a5e70..0f68840 100644
--- a/pkg/kafkacomm/publisher/pdp-heartbeat.go
+++ b/pkg/kafkacomm/publisher/pdp-heartbeat.go
@@ -30,6 +30,7 @@ import (
+	"policy-opa-pdp/pkg/policymap"
@@ -43,13 +44,17 @@ var (
 // Initializes a timer that sends periodic heartbeat messages to indicate the health and state of the PDP.
 func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) {
-	if intervalMs <= 0 {
+	mu.Lock()
+	defer mu.Unlock()
+	if intervalMs < 0 {
 		log.Errorf("Invalid interval provided: %d. Interval must be greater than zero.", intervalMs)
 		ticker = nil
+	} else if intervalMs == 0 {
+		intervalMs = currentInterval
-	mu.Lock()
-	defer mu.Unlock()
 	if ticker != nil && intervalMs == currentInterval {
 		log.Debug("Ticker is already running")
@@ -59,7 +64,7 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) {
 	if ticker != nil {
-	// StopTicker()
 	currentInterval = intervalMs
 	ticker = time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
@@ -89,12 +94,23 @@ func sendPDPHeartBeat(s PdpStatusSender) error {
 		Healthy:     model.Healthy,
 		Name:        pdpattributes.PdpName,
 		Description: "Pdp heartbeat",
+		Policies:    []model.ToscaConceptIdentifier{},
 		PdpGroup:    consts.PdpGroup,
 		PdpSubgroup: &pdpattributes.PdpSubgroup,
 	pdpStatus.RequestID = uuid.New().String()
 	pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli())
+	policiesMap := policymap.LastDeployedPolicies
+	if policiesMap != "" {
+		if (policymap.ExtractDeployedPolicies(policiesMap)) == nil {
+			log.Warnf("No Policies extracted from Policy Map")
+		} else {
+			pdpStatus.Policies = policymap.ExtractDeployedPolicies(policiesMap)
+		}
+	}
 	err := s.SendPdpStatus(pdpStatus)
 	log.Debugf("Sending Heartbeat ...")
 	if err != nil {
diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
index ba72c77..c3676c8 100644
--- a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
+++ b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
@@ -25,8 +25,7 @@ import (
-	)
 Success Case 1
diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration.go b/pkg/kafkacomm/publisher/pdp-pap-registration.go
index 5525f61..c726b2b 100644
--- a/pkg/kafkacomm/publisher/pdp-pap-registration.go
+++ b/pkg/kafkacomm/publisher/pdp-pap-registration.go
@@ -54,7 +54,6 @@ func (s *RealPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error {
 		log.Warnf("failed to marshal PdpStatus to JSON: %v", err)
 		return err
-	log.Debugf("Producer saved in RealPdp StatusSender")
 	kafkaMessage := &kafka.Message{
 		TopicPartition: kafka.TopicPartition{
diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
index 84013cb..8618eeb 100644
--- a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
+++ b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
@@ -22,14 +22,14 @@ package publisher
 import (
+	""
+	""
-	"time"
-	""
-	""
+	"time"
 type MockPdpStatusSender struct {
@@ -140,4 +140,3 @@ func TestSendPdpStatus_Failure(t *testing.T) {
 	// Verify that the Produce method was called exactly once
diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher.go b/pkg/kafkacomm/publisher/pdp-status-publisher.go
index 4a13b1c..3e25780 100644
--- a/pkg/kafkacomm/publisher/pdp-status-publisher.go
+++ b/pkg/kafkacomm/publisher/pdp-status-publisher.go
@@ -29,6 +29,7 @@ import (
+	"policy-opa-pdp/pkg/policymap"
@@ -36,10 +37,10 @@ import (
 // Sends a PDP_STATUS message to indicate the successful processing of a PDP_UPDATE request
 // received from the Policy Administration Point (PAP).
-func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error {
+func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate, resMessage string) error {
 	responseStatus := model.Success
-	responseMessage := "PDP Update was Successful"
+	responseMessage := resMessage
 	pdpStatus := model.PdpStatus{
 		MessageType: model.PDP_STATUS,
@@ -50,7 +51,7 @@ func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error
 		Description: "Pdp Status Response Message For Pdp Update",
 		PdpGroup:    consts.PdpGroup,
 		PdpSubgroup: &pdpattributes.PdpSubgroup,
-		// Policies: [],
+		Policies:    []model.ToscaConceptIdentifier{},
 		PdpResponse: &model.PdpResponseDetails{
 			ResponseTo:      &pdpUpdate.RequestId,
 			ResponseStatus:  &responseStatus,
@@ -61,6 +62,16 @@ func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error
 	pdpStatus.RequestID = uuid.New().String()
 	pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli())
+	policiesMap := policymap.LastDeployedPolicies
+	if policiesMap != "" {
+		if (policymap.ExtractDeployedPolicies(policiesMap)) == nil {
+			log.Warnf("No Policies extracted from Policy Map")
+		} else {
+			pdpStatus.Policies = policymap.ExtractDeployedPolicies(policiesMap)
+		}
+	}
 	log.Infof("Sending PDP Status With Update Response")
 	err := s.SendPdpStatus(pdpStatus)
@@ -73,6 +84,53 @@ func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error
+func SendPdpUpdateErrorResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate, err error) error {
+	responseStatus := model.Failure
+	responseMessage := fmt.Sprintf("%v", err)
+	pdpStatus := model.PdpStatus{
+		MessageType: model.PDP_STATUS,
+		PdpType:     consts.PdpType,
+		State:       pdpstate.State,
+		Healthy:     model.Healthy,
+		Name:        pdpattributes.PdpName,
+		Description: "Pdp Status Response Message For Pdp Update",
+		PdpGroup:    consts.PdpGroup,
+		PdpSubgroup: &pdpattributes.PdpSubgroup,
+		Policies:    []model.ToscaConceptIdentifier{},
+		PdpResponse: &model.PdpResponseDetails{
+			ResponseTo:      &pdpUpdate.RequestId,
+			ResponseStatus:  &responseStatus,
+			ResponseMessage: &responseMessage,
+		},
+	}
+	pdpStatus.RequestID = uuid.New().String()
+	pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli())
+	policiesMap := policymap.LastDeployedPolicies
+	if policiesMap != "" {
+		if (policymap.ExtractDeployedPolicies(policiesMap)) == nil {
+			log.Warnf("No Policies extracted from Policy Map")
+		} else {
+			pdpStatus.Policies = policymap.ExtractDeployedPolicies(policiesMap)
+		}
+	}
+	log.Infof("Sending PDP Status With Update Error Response")
+	err = s.SendPdpStatus(pdpStatus)
+	if err != nil {
+		log.Warnf("Failed to send PDP Update Message : %v", err)
+		return err
+	}
+	return nil
 // Sends a PDP_STATUS message to indicate a state change in the PDP (e.g., from PASSIVE to ACTIVE).
 func SendStateChangeResponse(s PdpStatusSender, pdpStateChange *model.PdpStateChange) error {
@@ -87,7 +145,7 @@ func SendStateChangeResponse(s PdpStatusSender, pdpStateChange *model.PdpStateCh
 		Description: "Pdp Status Response Message to Pdp State Change",
 		PdpGroup:    consts.PdpGroup,
 		PdpSubgroup: &pdpattributes.PdpSubgroup,
-		// Policies: [],
+		Policies:    []model.ToscaConceptIdentifier{},
 		PdpResponse: &model.PdpResponseDetails{
 			ResponseTo:      &pdpStateChange.RequestId,
 			ResponseStatus:  &responseStatus,
diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
index 83154ca..17805fe 100644
--- a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
+++ b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
@@ -35,7 +35,7 @@ func TestSendPdpUpdateResponse_Success(t *testing.T) {
 	mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
 	pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
-	err := SendPdpUpdateResponse(mockSender, pdpUpdate)
+	err := SendPdpUpdateResponse(mockSender, pdpUpdate, "PDPUpdate Successful")
 	assert.NoError(t, err)
 	mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
@@ -48,7 +48,7 @@ func TestSendPdpUpdateResponse_Failure(t *testing.T) {
 	pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
-	err := SendPdpUpdateResponse(mockSender, pdpUpdate)
+	err := SendPdpUpdateResponse(mockSender, pdpUpdate, "PDPUpdate Failure")
 	assert.Error(t, err)
diff --git a/pkg/metrics/counters.go b/pkg/metrics/counters.go
index e126554..7734cff 100644
--- a/pkg/metrics/counters.go
+++ b/pkg/metrics/counters.go
@@ -25,12 +25,13 @@ import "sync"
 var TotalErrorCount int64
 var DecisionSuccessCount int64
 var DecisionFailureCount int64
+var DeployFailureCount int64
+var DeploySuccessCount int64
+var UndeployFailureCount int64
+var UndeploySuccessCount int64
+var TotalPoliciesCount int64
 var mu sync.Mutex
 // Increment counter
 func IncrementTotalErrorCount() {
@@ -39,7 +40,7 @@ func IncrementTotalErrorCount() {
 // returns pointer to the counter
-func TotalErrorCountRef() *int64 {
+func totalErrorCountRef() *int64 {
 	defer mu.Unlock()
 	return &TotalErrorCount
@@ -53,7 +54,7 @@ func IncrementDecisionSuccessCount() {
 // returns pointer to the counter
-func TotalDecisionSuccessCountRef() *int64 {
+func totalDecisionSuccessCountRef() *int64 {
 	defer mu.Unlock()
 	return &DecisionSuccessCount
@@ -74,3 +75,84 @@ func TotalDecisionFailureCountRef() *int64 {
 	return &DecisionFailureCount
+// Increment counter
+func IncrementDeploySuccessCount() {
+        mu.Lock()
+        DeploySuccessCount++
+        mu.Unlock()
+// returns pointer to the counter
+func totalDeploySuccessCountRef() *int64 {
+        mu.Lock()
+        defer mu.Unlock()
+        return &DeploySuccessCount
+// Increment counter
+func IncrementDeployFailureCount() {
+        mu.Lock()
+        DeployFailureCount++
+        mu.Unlock()
+// returns pointer to the counter
+func totalDeployFailureCountRef() *int64 {
+        mu.Lock()
+        defer mu.Unlock()
+        return &DeployFailureCount
+// Increment counter
+func IncrementUndeploySuccessCount() {
+        mu.Lock()
+	UndeploySuccessCount++
+	mu.Unlock()
+// returns pointer to the counter
+func totalUndeploySuccessCountRef() *int64 {
+        mu.Lock()
+        defer mu.Unlock()
+        return &UndeploySuccessCount
+// Increment counter
+func IncrementUndeployFailureCount() {
+        mu.Lock()
+        UndeployFailureCount++
+        mu.Unlock()
+// returns pointer to the counter
+func totalUndeployFailureCountRef() *int64 {
+        mu.Lock()
+        defer mu.Unlock()
+        return &UndeployFailureCount
+// Increment counter
+func SetTotalPoliciesCount(newCount int64) {
+        mu.Lock()
+        TotalPoliciesCount = newCount
+        mu.Unlock()
+// returns pointer to the counter
+func totalPoliciesCountRef() *int64 {
+        mu.Lock()
+        defer mu.Unlock()
+        return &TotalPoliciesCount
diff --git a/pkg/metrics/counters_test.go b/pkg/metrics/counters_test.go
index ba8646b..852e365 100644
--- a/pkg/metrics/counters_test.go
+++ b/pkg/metrics/counters_test.go
@@ -39,7 +39,7 @@ func TestCounters(t *testing.T) {
-	assert.Equal(t, int64(5), *TotalErrorCountRef())
+	assert.Equal(t, int64(5), *totalErrorCountRef())
 	// Test IncrementQuerySuccessCount and TotalQuerySuccessCountRef
@@ -61,7 +61,7 @@ func TestCounters(t *testing.T) {
-	assert.Equal(t, int64(7), *TotalDecisionSuccessCountRef())
+	assert.Equal(t, int64(7), *totalDecisionSuccessCountRef())
 	// Test IncrementDecisionFailureCount and TotalDecisionFailureCountRef
diff --git a/pkg/metrics/statistics-provider.go b/pkg/metrics/statistics-provider.go
index 9b34839..0f826bc 100644
--- a/pkg/metrics/statistics-provider.go
+++ b/pkg/metrics/statistics-provider.go
@@ -57,20 +57,20 @@ func FetchCurrentStatistics(res http.ResponseWriter, req *http.Request) {
 	var statReport oapicodegen.StatisticsReport
-	statReport.DecisionSuccessCount = TotalDecisionSuccessCountRef()
+	statReport.DecisionSuccessCount = totalDecisionSuccessCountRef()
 	statReport.DecisionFailureCount = TotalDecisionFailureCountRef()
-	statReport.TotalErrorCount = TotalErrorCountRef()
+	statReport.TotalErrorCount      = totalErrorCountRef()
+	statReport.DeployFailureCount   = totalDeployFailureCountRef()
+        statReport.DeploySuccessCount   = totalDeploySuccessCountRef()
+        statReport.UndeployFailureCount = totalUndeployFailureCountRef() 
+        statReport.UndeploySuccessCount = totalUndeploySuccessCountRef()
+	statReport.TotalPoliciesCount   = totalPoliciesCountRef() 
 	// not implemented hardcoding the values to zero
 	// will be implemeneted in phase-2
-	zerovalue := int64(0)
 	onevalue := int64(1)
-	statReport.TotalPoliciesCount = &zerovalue
 	statReport.TotalPolicyTypesCount = &onevalue
-	statReport.DeployFailureCount = &zerovalue
-	statReport.DeploySuccessCount = &zerovalue
-	statReport.UndeployFailureCount = &zerovalue
-	statReport.UndeploySuccessCount = &zerovalue
 	value := int32(200)
 	statReport.Code = &value
diff --git a/pkg/model/mesages.go b/pkg/model/mesages.go
index 269f45f..966cd46 100644
--- a/pkg/model/mesages.go
+++ b/pkg/model/mesages.go
@@ -86,11 +86,11 @@ type PdpStatus struct {
 // models-pdp/src/main/java/org/onap/policy/models/pdp/concepts/
 type PdpUpdate struct {
 	Source                 string                   `json:"source" validate:"required"`
-	PdpHeartbeatIntervalMs int64                    `json:"pdpHeartbeatIntervalMs" validate:"required"`
+	PdpHeartbeatIntervalMs int64                    `json:"pdpHeartbeatIntervalMs"`
 	MessageType            string                   `json:"messageName" validate:"required"`
-	PoliciesToBeDeloyed    []string                 `json:"policiesToBeDeployed" validate:"required"`
-	policiesToBeUndeployed []ToscaConceptIdentifier `json:"policiesToBeUndeployed"`
-	Name                   string                   `json:"name" validate:"required"`
+	PoliciesToBeDeployed   []ToscaPolicy            `json:"policiesToBeDeployed" validate:"required"`
+	PoliciesToBeUndeployed []ToscaConceptIdentifier `json:"policiesToBeUndeployed"`
+	Name                   string                   `json:"name"`
 	TimestampMs            int64                    `json:"timestampMs" validate:"required"`
 	PdpGroup               string                   `json:"pdpGroup" validate:"required"`
 	PdpSubgroup            string                   `json:"pdpSubgroup" validate:"required"`
diff --git a/pkg/model/messages_test.go b/pkg/model/messages_test.go
index 217cd2a..536f683 100644
--- a/pkg/model/messages_test.go
+++ b/pkg/model/messages_test.go
@@ -114,8 +114,8 @@ func (p *PdpUpdate) Validate() error {
 	if p.MessageType == "" {
 		return errors.New("MessageType is required")
-	if len(p.PoliciesToBeDeloyed) == 0 {
-		return errors.New("PoliciesToBeDeloyed is required and must contain at least one policy")
+	if len(p.PoliciesToBeDeployed) == 0 {
+		return errors.New("PoliciesToBeDeployed is required and must contain at least one policy")
 	if p.Name == "" {
 		return errors.New("Name is required")
@@ -138,11 +138,52 @@ func (p *PdpUpdate) Validate() error {
 // TestPdpUpdateSerialization_Positive tests the successful serialization of PdpUpdate.
 func TestPdpUpdateSerialization_Success(t *testing.T) {
+	policies := []ToscaPolicy{
+		{
+			Type:        "onap.policies.native.opa",
+			TypeVersion: "1.0",
+			Properties: PolicyProperties{
+				Data: map[string]string{
+					"key1": "value1",
+					"key2": "value2",
+				},
+				Policy: map[string]string{
+					"policyKey1": "policyValue1",
+				},
+			},
+			Name:    "MySecurityPolicy",
+			Version: "1.0.0",
+			Metadata: Metadata{
+				PolicyID:      "policy-id-001",
+				PolicyVersion: "1.0",
+			},
+		},
+		{
+			Type:        "onap.policies.native.opa",
+			TypeVersion: "1.0",
+			Properties: PolicyProperties{
+				Data: map[string]string{
+					"threshold": "75",
+					"duration":  "30",
+				},
+				Policy: map[string]string{
+					"policyKey2": "policyValue2",
+				},
+			},
+			Name:    "MyPerformancePolicy",
+			Version: "1.0.0",
+			Metadata: Metadata{
+				PolicyID:      "policy-id-002",
+				PolicyVersion: "1.0",
+			},
+		},
+	}
 	pdpUpdate := PdpUpdate{
 		Source:                 "source1",
 		PdpHeartbeatIntervalMs: 5000,
 		MessageType:            "PDP_UPDATE",
-		PoliciesToBeDeloyed:    []string{"policy1", "policy2"},
+		PoliciesToBeDeployed:    policies,
 		Name:                   "ExamplePDP",
 		TimestampMs:            1633017600000,
 		PdpGroup:               "Group1",
@@ -162,7 +203,7 @@ func TestPdpUpdateSerialization_Failure(t *testing.T) {
 		Source:                 "",
 		PdpHeartbeatIntervalMs: 5000,
 		MessageType:            "",
-		PoliciesToBeDeloyed:    nil,
+		PoliciesToBeDeployed:    nil,
 		Name:                   "",
 		TimestampMs:            0,
 		PdpGroup:               "",
@@ -264,7 +305,6 @@ func TestPdpHealthStatusEnum(t *testing.T) {
 // TestPdpMessageType_String_Success validates the string representation of valid PdpMessageType values.
 func TestPdpMessageType_String_Success(t *testing.T) {
diff --git a/pkg/model/toscaconceptidentifier.go b/pkg/model/toscaconceptidentifier.go
index c9d7788..708442c 100644
--- a/pkg/model/toscaconceptidentifier.go
+++ b/pkg/model/toscaconceptidentifier.go
@@ -26,8 +26,8 @@ import (
 type ToscaConceptIdentifier struct {
-	Name    string
-	Version string
+	Name    string `json:"name"`
+	Version string `json:"version"`
 func NewToscaConceptIdentifier(name, version string) *ToscaConceptIdentifier {
diff --git a/pkg/model/toscapolicy.go b/pkg/model/toscapolicy.go
new file mode 100644
index 0000000..651ab17
--- /dev/null
+++ b/pkg/model/toscapolicy.go
@@ -0,0 +1,43 @@
+// -
+//   ========================LICENSE_START=================================
+//   Copyright (C) 2025: Deutsche Telekom
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   SPDX-License-Identifier: Apache-2.0
+//   ========================LICENSE_END===================================
+// hold the possible values for state of PDP.
+// models-pdp/src/main/java/org/onap/policy/models/pdp/enums/
+package model
+import ()
+type ToscaPolicy struct {
+	Type        string           `json:"type"`
+	TypeVersion string           `json:"type_version"`
+	Properties  PolicyProperties `json:"properties"`
+	Name        string           `json:"name"`
+	Version     string           `json:"version"`
+	Metadata    Metadata         `json:"metadata"`
+type PolicyProperties struct {
+	Data   map[string]string `json:"data"`
+	Policy map[string]string `json:"policy"`
+type Metadata struct {
+	PolicyID      string `json:"policy-id"`
+	PolicyVersion string `json:"policy-version"`
diff --git a/pkg/opasdk/opasdk.go b/pkg/opasdk/opasdk.go
index 51a34e7..81b94ce 100644
--- a/pkg/opasdk/opasdk.go
+++ b/pkg/opasdk/opasdk.go
@@ -27,18 +27,22 @@ import (
+	"net/http"
+	""
+	""
 // Define the structs
 var (
 	opaInstance *sdk.OPA  //A singleton instance of the OPA object
 	once        sync.Once //A sync.Once variable used to ensure that the OPA instance is initialized only once,
+	memStore    storage.Store
 // reads JSON configuration from a file and return a jsonReader
@@ -65,9 +69,11 @@ func GetOPASingletonInstance() (*sdk.OPA, error) {
 	var err error
 	once.Do(func() {
 		var opaErr error
+		memStore = inmem.New()
 		opaInstance, opaErr = sdk.New(context.Background(), sdk.Options{
 			// Configure your OPA instance here
 			V1Compatible: true,
+			Store:        memStore,
 		log.Debugf("Create an instance of OPA Object")
 		if opaErr != nil {
@@ -88,6 +94,158 @@ func GetOPASingletonInstance() (*sdk.OPA, error) {
 	return opaInstance, err
+func UpsertPolicy(ctx context.Context, policyID string, policyContent []byte) error {
+	txn, err := memStore.NewTransaction(ctx, storage.WriteParams)
+	if err != nil {
+		log.Warnf("Error creating transaction: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	err = memStore.UpsertPolicy(ctx, txn, policyID, policyContent)
+	if err != nil {
+		log.Warnf("Error inserting policy: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	err = memStore.Commit(ctx, txn)
+	if err != nil {
+		log.Warnf("Error commiting the transaction: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	return nil
+func DeletePolicy(ctx context.Context, policyID string) error {
+	txn, err := memStore.NewTransaction(ctx, storage.WriteParams)
+	if err != nil {
+		log.Warnf("Error creating transaction: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	err = memStore.DeletePolicy(ctx, txn, policyID)
+	if err != nil {
+		log.Warnf("Error deleting policy: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	err = memStore.Commit(ctx, txn)
+	if err != nil {
+		log.Warnf("Error commiting the transaction: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	return nil
+func WriteData(ctx context.Context, dataPath string, data interface{}) error {
+	txn, err := memStore.NewTransaction(ctx, storage.WriteParams)
+	if err != nil {
+		log.Warnf("Error creating transaction: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	// Initialize the path if it doesn't exist
+	err = initializePath(ctx, txn, dataPath)
+	if err != nil {
+		log.Warnf("Error initializling Path : %s", dataPath)
+		log.Warnf("Error : %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	err = memStore.Write(ctx, txn, storage.AddOp, storage.MustParsePath(dataPath), data)
+	if err != nil {
+		log.Warnf("Error Adding data: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	err = memStore.Commit(ctx, txn)
+	if err != nil {
+		log.Warnf("Error commiting the transaction: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	return nil
+func DeleteData(ctx context.Context, dataPath string) error {
+	txn, err := memStore.NewTransaction(ctx, storage.WriteParams)
+	if err != nil {
+		log.Warnf("Error creating transaction: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	err = memStore.Write(ctx, txn, storage.RemoveOp, storage.MustParsePath(dataPath), nil)
+	if err != nil {
+		log.Warnf("Error deleting data: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	err = memStore.Commit(ctx, txn)
+	if err != nil {
+		log.Warnf("Error commiting the transaction: %s", err)
+		memStore.Abort(ctx, txn)
+		return err
+	}
+	return nil
+func ListPolicies(res http.ResponseWriter, req *http.Request) {
+	ctx := context.Background()
+	rtxn, err := memStore.NewTransaction(ctx, storage.TransactionParams{Write: false})
+	if err != nil {
+		log.Warnf("Error creating transaction %s", err)
+		memStore.Abort(ctx, rtxn)
+		http.Error(res, err.Error(), http.StatusInternalServerError)
+		return
+	}
+	policies, err := memStore.ListPolicies(ctx, rtxn)
+	if err != nil {
+		log.Warnf("Error ListPolicies %s", err)
+		memStore.Abort(ctx, rtxn)
+		http.Error(res, err.Error(), http.StatusInternalServerError)
+		return
+	}
+	for _, policyId := range policies {
+		log.Debugf("Policy ID: %s", policyId)
+		policy, err := memStore.GetPolicy(ctx, rtxn, policyId)
+		if err != nil {
+			log.Warnf("Error GetPolicy %s", err)
+			memStore.Abort(ctx, rtxn)
+			http.Error(res, err.Error(), http.StatusInternalServerError)
+			return
+		}
+		log.Debugf("Policy Content: %s\n", string(policy))
+	}
+	memStore.Abort(ctx, rtxn)
+	res.WriteHeader(http.StatusOK)
+	res.Write([]byte("Check logs"))
+func initializePath(ctx context.Context, txn storage.Transaction, path string) error {
+	segments := storage.MustParsePath(path)
+	for i := 1; i <= len(segments); i++ {
+		subPath := segments[:i]
+		_, err := memStore.Read(ctx, txn, subPath)
+		if err != nil && storage.IsNotFound(err) {
+			// Create the intermediate path
+			log.Debugf("storage not found creating : %s", subPath.String())
+			err = memStore.Write(ctx, txn, storage.AddOp, subPath, map[string]interface{}{})
+			if err != nil {
+				log.Debugf("Error initializing path: %s", err)
+				return err
+			}
+		} else if err != nil {
+			log.Debugf("Error reading path: %s", err)
+			return err
+		}
+	}
+	return nil
diff --git a/pkg/opasdk/opasdk_test.go b/pkg/opasdk/opasdk_test.go
index 80d1a1e..2517376 100644
--- a/pkg/opasdk/opasdk_test.go
+++ b/pkg/opasdk/opasdk_test.go
@@ -20,43 +20,43 @@
 package opasdk
 import (
+	""
+	"context"
+	"fmt"
+	""
+	""
+	""
-	"testing"
-        "context"
-	"fmt"
-	""
-	""
-	""
-	""
+	"testing"
 // Mock for os.Open
 type MockFile struct {
-        mock.Mock
+	mock.Mock
 func (m *MockFile) Open(name string) (*os.File, error) {
-        args := m.Called(name)
-        return args.Get(0).(*os.File), args.Error(1)
+	args := m.Called(name)
+	return args.Get(0).(*os.File), args.Error(1)
 // Mock for io.ReadAll
 func mockReadAll(r io.Reader) ([]byte, error) {
-        return []byte(`{"config": "test"}`), nil
+	return []byte(`{"config": "test"}`), nil
 type MockSDK struct {
-    mock.Mock
+	mock.Mock
 func (m *MockSDK) New(ctx context.Context, options sdk.Options) (*sdk.OPA, error) {
-    fmt.Print("Inside New Method")
-    args := m.Called(ctx, options)
-    return args.Get(0).(*sdk.OPA), args.Error(1)
+	fmt.Print("Inside New Method")
+	args := m.Called(ctx, options)
+	return args.Get(0).(*sdk.OPA), args.Error(1)
 func TestGetOPASingletonInstance_ConfigurationFileNotexisting(t *testing.T) {
@@ -89,20 +89,20 @@ func TestGetOPASingletonInstance_SingletonBehavior(t *testing.T) {
 func TestGetOPASingletonInstance_ConfigurationFileLoaded(t *testing.T) {
-        tmpFile, err := os.CreateTemp("", "config.json")
-        if err != nil {
-                t.Fatalf("Failed to create temp file: %v", err)
-        }
-        defer os.Remove(tmpFile.Name())
+	tmpFile, err := os.CreateTemp("", "config.json")
+	if err != nil {
+		t.Fatalf("Failed to create temp file: %v", err)
+	}
+	defer os.Remove(tmpFile.Name())
-        consts.OpasdkConfigPath = tmpFile.Name()
+	consts.OpasdkConfigPath = tmpFile.Name()
-        // Simulate OPA instance creation
-        opaInstance, err := GetOPASingletonInstance()
+	// Simulate OPA instance creation
+	opaInstance, err := GetOPASingletonInstance()
-        // Assertions
-        assert.Nil(t, err)
-        assert.NotNil(t, opaInstance)
+	// Assertions
+	assert.Nil(t, err)
+	assert.NotNil(t, opaInstance)
 func TestGetOPASingletonInstance_OPAInstanceCreation(t *testing.T) {
@@ -123,79 +123,78 @@ func TestGetOPASingletonInstance_OPAInstanceCreation(t *testing.T) {
 func TestGetOPASingletonInstance_JSONReadError(t *testing.T) {
-        consts.OpasdkConfigPath = "/app/config/config.json"
+	consts.OpasdkConfigPath = "/app/config/config.json"
-        // Simulate an error in JSON read (e.g., corrupt file)
-        mockReadAll := func(r io.Reader) ([]byte, error) {
-                return nil, errors.New("Failed to read JSON file")
-        }
+	// Simulate an error in JSON read (e.g., corrupt file)
+	mockReadAll := func(r io.Reader) ([]byte, error) {
+		return nil, errors.New("Failed to read JSON file")
+	}
-        jsonReader, err := getJSONReader(consts.OpasdkConfigPath, os.Open, mockReadAll)
-        assert.NotNil(t, err)
-        assert.Nil(t, jsonReader)
+	jsonReader, err := getJSONReader(consts.OpasdkConfigPath, os.Open, mockReadAll)
+	assert.NotNil(t, err)
+	assert.Nil(t, jsonReader)
 func TestGetOPASingletonInstance_ValidConfigFile(t *testing.T) {
-        tmpFile, err := os.CreateTemp("", "config.json")
-        if err != nil {
-                t.Fatalf("Failed to create temp file: %v", err)
-        }
-        defer os.Remove(tmpFile.Name())
+	tmpFile, err := os.CreateTemp("", "config.json")
+	if err != nil {
+		t.Fatalf("Failed to create temp file: %v", err)
+	}
+	defer os.Remove(tmpFile.Name())
-        consts.OpasdkConfigPath = tmpFile.Name()
+	consts.OpasdkConfigPath = tmpFile.Name()
-        // Valid JSON content
-        validJSON := []byte(`{"config": "test"}`)
-        err = os.WriteFile(tmpFile.Name(), validJSON, 0644)
-        if err != nil {
-                t.Fatalf("Failed to write valid JSON to temp file: %v", err)
-        }
+	// Valid JSON content
+	validJSON := []byte(`{"config": "test"}`)
+	err = os.WriteFile(tmpFile.Name(), validJSON, 0644)
+	if err != nil {
+		t.Fatalf("Failed to write valid JSON to temp file: %v", err)
+	}
-        // Call the function
-        opaInstance, err := GetOPASingletonInstance()
+	// Call the function
+	opaInstance, err := GetOPASingletonInstance()
-        assert.Nil(t, err)
-        assert.NotNil(t, opaInstance)
+	assert.Nil(t, err)
+	assert.NotNil(t, opaInstance)
 func TestGetJSONReader(t *testing.T) {
-        // Create a mock file
-        mockFile := new(MockFile)
-        mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil)
+	// Create a mock file
+	mockFile := new(MockFile)
+	mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil)
-        // Call the function with mock functions
-        jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, mockReadAll)
+	// Call the function with mock functions
+	jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, mockReadAll)
-        // Check the results
-        assert.NoError(t, err)
-        assert.NotNil(t, jsonReader)
+	// Check the results
+	assert.NoError(t, err)
+	assert.NotNil(t, jsonReader)
-        // Check the content of the jsonReader
-        expectedContent := `{"config": "test"}`
-        actualContent := make([]byte, len(expectedContent))
-        jsonReader.Read(actualContent)
-        assert.Equal(t, expectedContent, string(actualContent))
+	// Check the content of the jsonReader
+	expectedContent := `{"config": "test"}`
+	actualContent := make([]byte, len(expectedContent))
+	jsonReader.Read(actualContent)
+	assert.Equal(t, expectedContent, string(actualContent))
-        // Assert that the mock methods were called
-        mockFile.AssertCalled(t, "Open", "/app/config/config.json")
+	// Assert that the mock methods were called
+	mockFile.AssertCalled(t, "Open", "/app/config/config.json")
 func TestGetJSONReader_ReadAllError(t *testing.T) {
-        mockFile := new(MockFile)
-        mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil)
+	mockFile := new(MockFile)
+	mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil)
-        // Simulate ReadAll error
-        jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, func(r io.Reader) ([]byte, error) {
-                return nil, io.ErrUnexpectedEOF
-        })
+	// Simulate ReadAll error
+	jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, func(r io.Reader) ([]byte, error) {
+		return nil, io.ErrUnexpectedEOF
+	})
-        assert.Error(t, err)
-        assert.Nil(t, jsonReader)
+	assert.Error(t, err)
+	assert.Nil(t, jsonReader)
-        mockFile.AssertCalled(t, "Open", "/app/config/config.json")
+	mockFile.AssertCalled(t, "Open", "/app/config/config.json")
 func TestGetOPASingletonInstance(t *testing.T) {
     // Call your function under test
     opaInstance, err := GetOPASingletonInstance()
@@ -210,7 +209,6 @@ func TestGetOPASingletonInstance(t *testing.T) {
     assert.NotNil(t, opaInstance, "OPA instance should be nil when sdk.New fails")
 // Helper to reset the singleton for testing
 func resetSingleton() {
 	opaInstance = nil
diff --git a/pkg/pdpattributes/pdpattributes.go b/pkg/pdpattributes/pdpattributes.go
index 8ce738b..005dd03 100644
--- a/pkg/pdpattributes/pdpattributes.go
+++ b/pkg/pdpattributes/pdpattributes.go
@@ -33,12 +33,12 @@ var (
 func init() {
-	PdpName = GenerateUniquePdpName()
+	PdpName = generateUniquePdpName()
 	log.Debugf("Name: %s", PdpName)
 // Generates a unique PDP name by appending a randomly generated UUID
-func GenerateUniquePdpName() string {
+func generateUniquePdpName() string {
 	return "opa-" + uuid.New().String()
@@ -48,7 +48,7 @@ func SetPdpSubgroup(pdpsubgroup string) {
 // Retrieves the current PDP subgroup value.
-func GetPdpSubgroup() string {
+func getPdpSubgroup() string {
 	return PdpSubgroup
@@ -58,7 +58,7 @@ func SetPdpHeartbeatInterval(pdpHeartbeatInterval int64) {
 // Retrieves the current PDP heartbeat interval value.
-func GetPdpHeartbeatInterval() int64 {
+func getPdpHeartbeatInterval() int64 {
 	return PdpHeartbeatInterval
diff --git a/pkg/pdpattributes/pdpattributes_test.go b/pkg/pdpattributes/pdpattributes_test.go
index 909b4ee..c9a41c7 100644
--- a/pkg/pdpattributes/pdpattributes_test.go
+++ b/pkg/pdpattributes/pdpattributes_test.go
@@ -28,15 +28,15 @@ import (
 func TestGenerateUniquePdpName_Success(t *testing.T) {
 	t.Run("GenerateValidPdpName", func(t *testing.T) {
-		pdpName := GenerateUniquePdpName()
+		pdpName := generateUniquePdpName()
 		assert.Contains(t, pdpName, "opa-", "Expected PDP name to start with 'opa-'")
 func TestGenerateUniquePdpName_Failure(t *testing.T) {
 	t.Run("UniqueNamesCheck", func(t *testing.T) {
-		pdpName1 := GenerateUniquePdpName()
-		pdpName2 := GenerateUniquePdpName()
+		pdpName1 := generateUniquePdpName()
+		pdpName2 := generateUniquePdpName()
 		assert.NotEqual(t, pdpName1, pdpName2, "Expected different UUID for each generated PDP name")
 		assert.Len(t, pdpName1, len("opa-")+36, "Expected length of PDP name to match 'opa-<UUID>' format")
@@ -46,14 +46,14 @@ func TestSetPdpSubgroup_Success(t *testing.T) {
 	t.Run("ValidSubgroup", func(t *testing.T) {
 		expectedSubgroup := "subgroup1"
-		assert.Equal(t, expectedSubgroup, GetPdpSubgroup(), "Expected PDP subgroup to match set value")
+		assert.Equal(t, expectedSubgroup, getPdpSubgroup(), "Expected PDP subgroup to match set value")
 func TestSetPdpSubgroup_Failure(t *testing.T) {
 	t.Run("EmptySubgroup", func(t *testing.T) {
-		assert.Equal(t, "", GetPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string")
+		assert.Equal(t, "", getPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string")
 	t.Run("LargeSubgroup", func(t *testing.T) {
@@ -62,7 +62,7 @@ func TestSetPdpSubgroup_Failure(t *testing.T) {
 			largeSubgroup[i] = 'a'
-		assert.Equal(t, string(largeSubgroup), GetPdpSubgroup(), "Expected large PDP subgroup to match set value")
+		assert.Equal(t, string(largeSubgroup), getPdpSubgroup(), "Expected large PDP subgroup to match set value")
@@ -70,19 +70,19 @@ func TestSetPdpHeartbeatInterval_Success(t *testing.T) {
 	t.Run("ValidHeartbeatInterval", func(t *testing.T) {
 		expectedInterval := int64(30)
-		assert.Equal(t, expectedInterval, GetPdpHeartbeatInterval(), "Expected heartbeat interval to match set value")
+		assert.Equal(t, expectedInterval, getPdpHeartbeatInterval(), "Expected heartbeat interval to match set value")
 func TestSetPdpHeartbeatInterval_Failure(t *testing.T) {
 	t.Run("FailureHeartbeatInterval", func(t *testing.T) {
-		assert.Equal(t, int64(-10), GetPdpHeartbeatInterval(), "Expected heartbeat interval to handle negative values")
+		assert.Equal(t, int64(-10), getPdpHeartbeatInterval(), "Expected heartbeat interval to handle negative values")
 	t.Run("LargeHeartbeatInterval", func(t *testing.T) {
 		largeInterval := int64(time.Hour * 24 * 365 * 10) // 10 years in seconds
-		assert.Equal(t, largeInterval, GetPdpHeartbeatInterval(), "Expected PDP heartbeat interval to handle large values")
+		assert.Equal(t, largeInterval, getPdpHeartbeatInterval(), "Expected PDP heartbeat interval to handle large values")
diff --git a/pkg/policymap/policy_and_data_map.go b/pkg/policymap/policy_and_data_map.go
new file mode 100644
index 0000000..5e06a59
--- /dev/null
+++ b/pkg/policymap/policy_and_data_map.go
@@ -0,0 +1,204 @@
+// -
+//   ========================LICENSE_START=================================
+//   Copyright (C) 2025: Deutsche Telekom
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   SPDX-License-Identifier: Apache-2.0
+//   ========================LICENSE_END===================================
+// will process the update message from pap and send the pdp status response.
+package policymap
+import (
+	"encoding/json"
+	"fmt"
+	"policy-opa-pdp/pkg/log"
+	"policy-opa-pdp/pkg/model"
+var (
+	LastDeployedPolicies string
+func formatPolicyAndDataMap(deployedPolicies []map[string]interface{}) (string, error) {
+	// Create the final JSON
+	finalMap := map[string]interface{}{
+		"deployed_policies_dict": deployedPolicies,
+	}
+	// Marshal the final map into JSON
+	policyDataJSON, err := FormatMapofAnyType(finalMap)
+	if err != nil {
+		return "", fmt.Errorf("failed to format json: %v", err)
+	}
+	// Update global state
+	LastDeployedPolicies = policyDataJSON
+	log.Infof("PoliciesDeployed Map: %v", LastDeployedPolicies)
+	return LastDeployedPolicies, nil
+func FormatMapofAnyType[T any](mapOfAnyType T) (string, error) {
+	// Marshal the final map into JSON
+	jsonBytes, err := json.MarshalIndent(mapOfAnyType, "", " ")
+	if err != nil {
+		return "", fmt.Errorf("failed to format json: %v", err)
+	}
+	return string(jsonBytes), nil
+func UnmarshalLastDeployedPolicies(lastdeployedPolicies string) ([]map[string]interface{}, error) {
+	if len(lastdeployedPolicies) == 0 {
+		return []map[string]interface{}{}, nil
+	}
+	var policiesMap struct {
+		DeployedPoliciesDict []map[string]interface{} `json:"deployed_policies_dict"`
+	}
+	err := json.Unmarshal([]byte(lastdeployedPolicies), &policiesMap)
+	if err != nil {
+		return nil, fmt.Errorf("failed to unmarshal LastDeployedPolicies: %v", err)
+	}
+	return policiesMap.DeployedPoliciesDict, nil
+func UpdateDeployedPoliciesinMap(policy model.ToscaPolicy) (string, error) {
+	// Unmarshal the last known policies
+	deployedPolicies, err := UnmarshalLastDeployedPolicies(LastDeployedPolicies)
+	if err != nil {
+		log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+	}
+	dataKeys := make([]string, 0, len(policy.Properties.Data))
+	policyKeys := make([]string, 0, len(policy.Properties.Policy))
+	for key := range policy.Properties.Data {
+		dataKeys = append(dataKeys, key)
+	}
+	for key := range policy.Properties.Policy {
+		policyKeys = append(policyKeys, key)
+	}
+	directoryMap := map[string]interface{}{
+		"policy-id":      policy.Metadata.PolicyID,
+		"policy-version": policy.Metadata.PolicyVersion,
+		"data":           dataKeys,
+		"policy":         policyKeys,
+	}
+	deployedPolicies = append(deployedPolicies, directoryMap)
+	return formatPolicyAndDataMap(deployedPolicies)
+func RemoveUndeployedPoliciesfromMap(undeployedPolicy map[string]interface{}) (string, error) {
+	// Unmarshal the last known policies
+	deployedPolicies, err := UnmarshalLastDeployedPolicies(LastDeployedPolicies)
+	if err != nil {
+		log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+	}
+	remainingPolicies := []map[string]interface{}{}
+	for _, policy := range deployedPolicies {
+		shouldRetain := true
+		if policy["policy-id"] == undeployedPolicy["policy-id"] && policy["policy-version"] == undeployedPolicy["policy-version"] {
+			shouldRetain = false
+		}
+		if shouldRetain {
+			remainingPolicies = append(remainingPolicies, policy)
+		}
+	}
+	return formatPolicyAndDataMap(remainingPolicies)
+func VerifyAndReturnPoliciesToBeDeployed(lastdeployedPoliciesMap string, pdpUpdate model.PdpUpdate) []model.ToscaPolicy {
+	type PoliciesMap struct {
+		DeployedPoliciesDict []map[string]interface{} `json:"deployed_policies_dict"`
+	}
+	var policiesMap PoliciesMap
+	err := json.Unmarshal([]byte(lastdeployedPoliciesMap), &policiesMap)
+	if err != nil {
+		log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+		return pdpUpdate.PoliciesToBeDeployed
+	}
+	deployedPolicies := policiesMap.DeployedPoliciesDict
+	var policiesToBeDeployed []model.ToscaPolicy
+	for _, deployingPolicy := range pdpUpdate.PoliciesToBeDeployed {
+		shouldDeploy := true
+		for _, deployedPolicy := range deployedPolicies {
+			if deployedPolicy["policy-id"] == deployingPolicy.Name && deployedPolicy["policy-version"] == deployingPolicy.Version {
+				log.Infof("Policy Previously deployed: %v %v, skipping", deployingPolicy.Name, deployingPolicy.Version)
+				shouldDeploy = false
+				break
+			}
+		}
+		if shouldDeploy {
+			log.Infof("Policy is new and should be deployed: %v %v", deployingPolicy.Name, deployingPolicy.Version)
+			policiesToBeDeployed = append(policiesToBeDeployed, deployingPolicy)
+		}
+	}
+	return policiesToBeDeployed
+func ExtractDeployedPolicies(policiesMap string) []model.ToscaConceptIdentifier {
+	// Unmarshal the last known policies
+	deployedPolicies, err := UnmarshalLastDeployedPolicies(policiesMap)
+	if err != nil {
+		log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+	}
+	pdpstatus := model.PdpStatus{
+		Policies: []model.ToscaConceptIdentifier{},
+	}
+	for _, policy := range deployedPolicies {
+		// Extract policy-id and policy-version
+		policyID, idOk := policy["policy-id"].(string)
+		policyVersion, versionOk := policy["policy-version"].(string)
+		if !idOk || !versionOk {
+			log.Warnf("Missing or invalid policy-id or policy-version")
+			return nil
+		}
+		tosca := model.ToscaConceptIdentifier{
+			Name:    policyID,
+			Version: policyVersion,
+		}
+		pdpstatus.Policies = append(pdpstatus.Policies, tosca)
+	}
+	return pdpstatus.Policies
+func GetTotalDeployedPoliciesCountFromMap() int {
+        deployedPolicies, err := UnmarshalLastDeployedPolicies(LastDeployedPolicies)
+        if err != nil {
+                log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err)
+		return 0
+        }
+        return len(deployedPolicies)
diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go
index 9b405d5..313b9a6 100644
--- a/pkg/utils/utils.go
+++ b/pkg/utils/utils.go
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -21,7 +21,14 @@
 package utils
 import (
+	"fmt"
+	""
+	"os"
+	"path/filepath"
+	"policy-opa-pdp/pkg/log"
+	"policy-opa-pdp/pkg/model"
+	"strings"
 // validates if the given request is in valid uuid form
@@ -29,3 +36,181 @@ func IsValidUUID(u string) bool {
 	_, err := uuid.Parse(u)
 	return err == nil
+// Helper function to create a directory if it doesn't exist
+func CreateDirectory(dirPath string) error {
+	err := os.MkdirAll(dirPath, os.ModePerm)
+	if err != nil {
+		log.Errorf("Failed to create directory %s: %v", dirPath, err)
+		return err
+	}
+	log.Infof("Directory created: %s", dirPath)
+	return nil
+// Helper function to check and remove a directory
+func RemoveDirectory(dirPath string) error {
+	entries, err := os.ReadDir(dirPath)
+	if err != nil {
+		if os.IsNotExist(err) {
+			log.Warnf("Directory does not exist: %s", dirPath)
+			// Directory does not exist, nothing to do
+			return nil
+		}
+		return fmt.Errorf("failed to read directory: %w", err)
+	}
+	for _, entry := range entries {
+		entryPath := filepath.Join(dirPath, entry.Name())
+		if entry.IsDir() {
+			// Check if the subdirectory is empty and delete it
+			isEmpty, err := isDirEmpty(entryPath)
+			if err != nil {
+				return err
+			}
+			if isEmpty {
+				log.Debugf("Removing empty subdirectory: %s", entryPath)
+				if err := os.RemoveAll(entryPath); err != nil {
+					return fmt.Errorf("failed to remove directory: %s, error: %w", entryPath, err)
+				}
+			}
+		} else {
+			// Delete specific files in the parent directory
+			if entry.Name() == "data.json" || entry.Name() == "policy.rego" {
+				log.Debugf("Removing file: %s", entryPath)
+				if err := os.Remove(entryPath); err != nil {
+					return fmt.Errorf("failed to remove file: %s, error: %w", entryPath, err)
+				}
+			}
+		}
+	}
+	return nil
+// Helper function to check if a directory is empty
+func isDirEmpty(dirPath string) (bool, error) {
+	entries, err := os.ReadDir(dirPath)
+	if err != nil {
+		return false, fmt.Errorf("failed to read directory: %s, error: %w", dirPath, err)
+	}
+	return len(entries) == 0, nil
+func ValidateFieldsStructs(pdpUpdate model.PdpUpdate) error {
+	//Initialize Validator and validate Struct after unmarshalling
+	validate := validator.New()
+	err := validate.Struct(pdpUpdate)
+	if err != nil {
+		for _, err := range err.(validator.ValidationErrors) {
+			log.Infof("Field %s failed on the %s tag\n", err.Field(), err.Tag())
+		}
+		return err
+	}
+	return err
+// Validate validates the fields based on your requirements.
+func ValidateToscaPolicyJsonFields(policy model.ToscaPolicy) error {
+	// 1. Validate that Name, Version, and Metadata fields match.
+	emphasize := "Validation emphasizes the condition"
+	if policy.Name != policy.Metadata.PolicyID {
+		return fmt.Errorf("policy name '%s' does not match metadata policy-id '%s', '%s'", policy.Name, policy.Metadata.PolicyID, emphasize)
+	}
+	if policy.Version != policy.Metadata.PolicyVersion {
+		return fmt.Errorf("policy version '%s' does not match metadata policy-version '%s', '%s'", policy.Version, policy.Metadata.PolicyVersion, emphasize)
+	}
+	if policy.Properties.Data != nil {
+		// 2. Validate that Name is a suffix for keys in Properties.Data and Properties.Policy.
+		keySeen := make(map[string]bool)
+		for key := range policy.Properties.Data {
+			if keySeen[key] {
+				return fmt.Errorf("duplicate data key '%s' found, '%s'", key, emphasize)
+			}
+			keySeen[key] = true
+			if !strings.HasPrefix(key, "node." + policy.Name) {
+				return fmt.Errorf("data key '%s' does not have name '%s' as a prefix, '%s'", key, policy.Name, emphasize)
+			}
+		}
+	}
+	keySeen := make(map[string]bool)
+	for key := range policy.Properties.Policy {
+		if keySeen[key] {
+			return fmt.Errorf("duplicate policy key '%s' found, '%s'", key, emphasize)
+		}
+		keySeen[key] = true
+		if !strings.HasPrefix(key, policy.Name) {
+			return fmt.Errorf("policy key '%s' does not have name '%s' as a prefix, '%s'", key, policy.Name, emphasize)
+		}
+	}
+	return nil
+func IsPolicyNameAllowed(policy model.ToscaPolicy, deployedPolicies []map[string]interface{}) (bool, error) {
+	policyID := policy.Name
+	if policyID == "" {
+		return false, fmt.Errorf("Policy Name cannot be Empty")
+	}
+	policyHierarchyLevel := strings.Split(policyID, ".")
+	for _, deployedPolicy := range deployedPolicies {
+		deployedPolicyID, ok := deployedPolicy["policy-id"].(string)
+		if !ok {
+			return false, fmt.Errorf("Invalid or missing policy-id field")
+		}
+		deployedPolicyIDHierarchyLevel := strings.Split(deployedPolicyID, ".")
+		if isParentOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel) {
+			return false, fmt.Errorf("Policy Validation Failed : Policy-id: %s is parent  of deployed policy, overrides existing policy: %s", policyID, deployedPolicyID)
+		}
+		if isChildOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel) {
+			return false, fmt.Errorf("Policy Validation Failed:  Policy-id: %s is child  of deployed policy , can overwrite existing policy: %s", policyID, deployedPolicyID)
+		}
+	}
+	return true, nil
+func isParentOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel []string) bool {
+	// new policy should have fewer levels than deployed policy to be a parent
+	if len(policyHierarchyLevel) < len(deployedPolicyIDHierarchyLevel) {
+	for policyNameIndex := range policyHierarchyLevel {
+		if policyHierarchyLevel[policyNameIndex] != deployedPolicyIDHierarchyLevel[policyNameIndex] {
+			return false
+		}
+	}
+	return true
+        }
+	return false
+func isChildOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel []string) bool {
+	// new policy should have more levels than deployed policy to be a  child
+	if len(policyHierarchyLevel) > len(deployedPolicyIDHierarchyLevel) {
+	for policyNameIndex := range deployedPolicyIDHierarchyLevel {
+		if deployedPolicyIDHierarchyLevel[policyNameIndex] != policyHierarchyLevel[policyNameIndex] {
+			return false
+		}
+	}
+	return true
+        }
+	return false
diff --git a/test/ b/test/
index 96f4eed..bc9f931 100644
--- a/test/
+++ b/test/
@@ -1,5 +1,19 @@
 # Testing OPA
+## Curl URL For Deployment.
+1. `curl -u 'policyadmin:zb!XztG34' -X POST -H "Content-Type":"application/yaml" --data-binary @test_resources/policy_deploy_single_policy.yaml http://localhost:30002/policy/api/v1/policytypes/onap.policies.native.opa/versions/1.0.0/policies`
+2. `curl -u 'policyadmin:zb!XztG34' -X POST  -H "Content-Type":"application/json" -d  @test_resources/deploy.json http://localhost:30003/policy/pap/v1/pdps/policies`
+## Curl URL For Undeployment
+`curl -u 'policyadmin:zb!XztG34' -X DELETE http://localhost:30003/policy/pap/v1/pdps/policies/role/versions/1.0.0` , where role is the policy name.
+## Curl URL for Batch Undeployment.
+`curl -v -u 'policyadmin:zb!XztG34' -X POST  -H "Content-Type":"application/json" -d  @test_resources/undeploy_batch_delete.json  http://localhost:30003/policy/pap/v1/pdps/deployments/batch`
 ## Verification API Calls
 curl -u 'policyadmin:zb!XztG34' -H 'Content-Type: application/json' -H 'Accept: application/json' -d '{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "11:34:56", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z", "policyFilter" : [""], "policyName":"example","input":{"method":"POST","path":["users"]}}' -X POST
diff --git a/test/docker-compose.yml b/test/docker-compose.yml
index 273fd1d..a4dea24 100644
--- a/test/docker-compose.yml
+++ b/test/docker-compose.yml
@@ -97,13 +97,6 @@ services:
        - ./
        - ./Opagroup.json:/app/Opagroup.json
        - ./policy-new.yaml:/app/policy-new.yaml
-       - type: bind
-         source: ./policies
-         target: /opt/policies 
-       - type: bind
-         source: ./data
-         target: /opt/data 
         LOG_LEVEL: debug
         KAFKA_URL: "kafka:9092"
diff --git a/test/test_resources/deploy.json b/test/test_resources/deploy.json
new file mode 100644
index 0000000..2698286
--- /dev/null
+++ b/test/test_resources/deploy.json
@@ -0,0 +1,10 @@
+   "policies": [
+           {
+                    "policy-id": "role",
+                    "policy-version": "1.0.0"
+           }
+   ]
diff --git a/test/test_resources/deploy_collab.json b/test/test_resources/deploy_collab.json
new file mode 100644
index 0000000..06d43c9
--- /dev/null
+++ b/test/test_resources/deploy_collab.json
@@ -0,0 +1,10 @@
+   "policies": [
+           {
+                    "policy-id": "collab",
+                    "policy-version": "1.0.0"
+           }
+   ]
diff --git a/test/test_resources/deploy_conflict.json b/test/test_resources/deploy_conflict.json
new file mode 100644
index 0000000..72d2fcd
--- /dev/null
+++ b/test/test_resources/deploy_conflict.json
@@ -0,0 +1,10 @@
+   "policies": [
+           {
+                    "policy-id": "conflict",
+                    "policy-version": "1.0.0"
+           }
+   ]
diff --git a/test/test_resources/deploy_zone.json b/test/test_resources/deploy_zone.json
new file mode 100644
index 0000000..217457b
--- /dev/null
+++ b/test/test_resources/deploy_zone.json
@@ -0,0 +1,11 @@
+   "policies": [
+           {
+                    "policy-id": "zone",
+                    "policy-version": "1.0.0"
+           }
+   ]
diff --git a/test/test_resources/policy_collab.yaml b/test/test_resources/policy_collab.yaml
new file mode 100644
index 0000000..e56bbed
--- /dev/null
+++ b/test/test_resources/policy_collab.yaml
@@ -0,0 +1,18 @@
+tosca_definitions_version: tosca_simple_yaml_1_1_0
+  policies:
+    - collab:
+        type: onap.policies.native.opa
+        type_version: 1.0.0
+        properties:
+          data:
+          policy:
+            collab.conflict: cGFja2FnZSBjb2xsYWIuY29uZmxpY3QKCmltcG9ydCByZWdvLnYxCgphbGxvdyBpZiB7IGlucHV0Lm5hbWUgPT0gImFsaWNlIiB9CmRlbnkgaWYgeyBpbnB1dC5uYW1lID09ICJhbGljZSIgfQoKIyBkZW55IGV2ZXJ5dGhpbmcgYnkgZGVmYXVsdApkZWZhdWx0IGF1dGh6IDo9IGZhbHNlCgojIGRlbnkgb3ZlcnJpZGVzIGFsbG93CmF1dGh6IGlmIHsKICAgIGFsbG93CiAgICBub3QgZGVueQp9Cg==
+            collab.action: cGFja2FnZSBjb2xsYWIuYWN0aW9uCgppbXBvcnQgcmVnby52MQoKIyBCeSBkZWZhdWx0LCBkZW55IHJlcXVlc3RzLgpkZWZhdWx0IGFsbG93IDo9IGZhbHNlCgoKIyBBbGxvdyB0aGUgYWN0aW9uIGlmIGFkbWluIHJvbGUgaXMgZ3JhbnRlZCBwZXJtaXNzaW9uIHRvIHBlcmZvcm0gdGhlIGFjdGlvbi4KYWxsb3cgaWYgewogICAgc29tZSBpCiAgICBkYXRhLmNvbGxhYi5hY3Rpb24udXNlcl9yb2xlc1tpbnB1dC51c2VyXVtpXSA9PSByb2xlCiAgICBzb21lIGoKICAgIGRhdGEuY29sbGFiLmFjdGlvbi5yb2xlX3Blcm1pc3Npb25zW3JvbGVdLmFjdGlvbnNbal0gPT0gaW5wdXQuYWN0aW9uCiAgICBzb21lIGsKICAgIGRhdGEuY29sbGFiLmFjdGlvbi5yb2xlX3Blcm1pc3Npb25zW3JvbGVdLnJlc291cmNlc1trXSA9PSBpbnB1dC50eXBlCn0KIyAgICAgICAqIFJlZ28gY29tcGFyaXNvbiB0byBvdGhlciBzeXN0ZW1zOiBodHRwczovL3d3dy5vcGVucG9saWN5YWdlbnQub3JnL2RvY3MvbGF0ZXN0L2NvbXBhcmlzb24tdG8tb3RoZXItc3lzdGVtcy8KIyAgICAgICAqIFJlZ28gSXRlcmF0aW9uOiBodHRwczovL3d3dy5vcGVucG9saWN5YWdlbnQub3JnL2RvY3MvbGF0ZXN0LyNpdGVyYXRpb24KCg==
+            collab: cGFja2FnZSBjb2xsYWIKCmltcG9ydCByZWdvLnYxCmltcG9ydCBkYXRhLmNvbGxhYi5jb25mbGljdAppbXBvcnQgZGF0YS5jb2xsYWIuYWN0aW9uCgpkZWZhdWx0IGFsbG93IDo9IGZhbHNlCmFsbG93IGlmIHsKICAgIGNvbmZsaWN0LmFsbG93CiAgICBhY3Rpb24uYWxsb3cKfQ==
+        name: collab
+        version: 1.0.0
+        metadata:
+          policy-id: collab
+          policy-version: 1.0.0
diff --git a/test/test_resources/policy_conflict.yaml b/test/test_resources/policy_conflict.yaml
new file mode 100644
index 0000000..0406179
--- /dev/null
+++ b/test/test_resources/policy_conflict.yaml
@@ -0,0 +1,15 @@
+tosca_definitions_version: tosca_simple_yaml_1_1_0
+  policies:
+    - conflict:
+        type: onap.policies.native.opa
+        type_version: 1.0.0
+        properties:
+          data:
+          policy:
+            conflict: cGFja2FnZSBjb25mbGljdAoKaW1wb3J0IHJlZ28udjEKCmFsbG93IGlmIHsgaW5wdXQubmFtZSA9PSAiYWxpY2UiIH0KZGVueSBpZiB7IGlucHV0Lm5hbWUgPT0gImFsaWNlIiB9CgojIGRlbnkgZXZlcnl0aGluZyBieSBkZWZhdWx0CmRlZmF1bHQgYXV0aHogOj0gZmFsc2UKCiMgZGVueSBvdmVycmlkZXMgYWxsb3cKYXV0aHogaWYgewogICAgYWxsb3cKICAgIG5vdCBkZW55Cn0KCg==
+        name: conflict
+        version: 1.0.0
+        metadata:
+          policy-id: conflict
+          policy-version: 1.0.0
diff --git a/test/test_resources/policy_deploy_single_policy.yaml b/test/test_resources/policy_deploy_single_policy.yaml
new file mode 100644
index 0000000..872b6c6
--- /dev/null
+++ b/test/test_resources/policy_deploy_single_policy.yaml
@@ -0,0 +1,17 @@
+tosca_definitions_version: tosca_simple_yaml_1_1_0
+  policies:
+    - role:
+        type: onap.policies.native.opa
+        type_version: 1.0.0
+        properties:
+          data:
+          policy:
+            role: cGFja2FnZSByb2xlCgppbXBvcnQgcmVnby52MQoKIyBCeSBkZWZhdWx0LCBkZW55IHJlcXVlc3RzLgpkZWZhdWx0IGFsbG93IDo9IGZhbHNlCgojIEFsbG93IGFkbWlucyB0byBkbyBhbnl0aGluZy4KYWxsb3cgaWYgdXNlcl9pc19hZG1pbgoKIyBBbGxvdyB0aGUgYWN0aW9uIGlmIHRoZSB1c2VyIGlzIGdyYW50ZWQgcGVybWlzc2lvbiB0byBwZXJmb3JtIHRoZSBhY3Rpb24uCmFsbG93IGlmIHsKICAgICAgICAjIEZpbmQgZ3JhbnRzIGZvciB0aGUgdXNlci4KICAgICAgICBzb21lIGdyYW50IGluIHVzZXJfaXNfZ3JhbnRlZAoKICAgICAgICAjIENoZWNrIGlmIHRoZSBncmFudCBwZXJtaXRzIHRoZSBhY3Rpb24uCiAgICAgICAgaW5wdXQuYWN0aW9uID09IGdyYW50LmFjdGlvbgogICAgICAgIGlucHV0LnR5cGUgPT0gZ3JhbnQudHlwZQp9CgojIHVzZXJfaXNfYWRtaW4gaXMgdHJ1ZSBpZiAiYWRtaW4iIGlzIGFtb25nIHRoZSB1c2VyJ3Mgcm9sZXMgYXMgcGVyIGRhdGEudXNlcl9yb2xlcwp1c2VyX2lzX2FkbWluIGlmICJhZG1pbiIgaW4gZGF0YS5yb2xlLnVzZXJfcm9sZXNbaW5wdXQudXNlcl0KCiMgdXNlcl9pc19ncmFudGVkIGlzIGEgc2V0IG9mIGdyYW50cyBmb3IgdGhlIHVzZXIgaWRlbnRpZmllZCBpbiB0aGUgcmVxdWVzdC4KIyBUaGUgYGdyYW50YCB3aWxsIGJlIGNvbnRhaW5lZCBpZiB0aGUgc2V0IGB1c2VyX2lzX2dyYW50ZWRgIGZvciBldmVyeS4uLgp1c2VyX2lzX2dyYW50ZWQgY29udGFpbnMgZ3JhbnQgaWYgewogICAgICAgICMgYHJvbGVgIGFzc2lnbmVkIGFuIGVsZW1lbnQgb2YgdGhlIHVzZXJfcm9sZXMgZm9yIHRoaXMgdXNlci4uLgogICAgICAgIHNvbWUgcm9sZSBpbiBkYXRhLnJvbGUudXNlcl9yb2xlc1tpbnB1dC51c2VyXQoKICAgICAgICAjIGBncmFudGAgYXNzaWduZWQgYSBzaW5nbGUgZ3JhbnQgZnJvbSB0aGUgZ3JhbnRzIGxpc3QgZm9yICdyb2xlJy4uLgogICAgICAgIHNvbWUgZ3JhbnQgaW4gZGF0YS5yb2xlLnJvbGVfZ3JhbnRzW3JvbGVdCn0KCiMgICAgICAgKiBSZWdvIGNvbXBhcmlzb24gdG8gb3RoZXIgc3lzdGVtczogaHR0cHM6Ly93d3cub3BlbnBvbGljeWFnZW50Lm9yZy9kb2NzL2xhdGVzdC9jb21wYXJpc29uLXRvLW90aGVyLXN5c3RlbXMvCiMgICAgICAgKiBSZWdvIEl0ZXJhdGlvbjogaHR0cHM6Ly93d3cub3BlbnBvbGljeWFnZW50Lm9yZy9kb2NzL2xhdGVzdC8jaXRlcmF0aW9uCgo=
+        name: role
+        version: 1.0.0
+        metadata:
+          policy-id: role
+          policy-version: 1.0.0
diff --git a/test/test_resources/policy_zone.yaml b/test/test_resources/policy_zone.yaml
new file mode 100644
index 0000000..4f3ade1
--- /dev/null
+++ b/test/test_resources/policy_zone.yaml
@@ -0,0 +1,16 @@
+tosca_definitions_version: tosca_simple_yaml_1_1_0
+  policies:
+    - zone:
+        type: onap.policies.native.opa
+        type_version: 1.0.0
+        properties:
+          data:
+             zone: ewogICJ6b25lIjogewogICAgInpvbmVfYWNjZXNzX2xvZ3MiOiBbCiAgICAgIHsgImxvZ19pZCI6ICJsb2cxIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDA5OjAwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJncmFudGVkIiwgInVzZXIiOiAidXNlcjEiIH0sCiAgICAgIHsgImxvZ19pZCI6ICJsb2cyIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDEwOjMwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJkZW5pZWQiLCAidXNlciI6ICJ1c2VyMiIgfSwKICAgICAgeyAibG9nX2lkIjogImxvZzMiLCAidGltZXN0YW1wIjogIjIwMjQtMTEtMDFUMTE6MDA6MDBaIiwgInpvbmVfaWQiOiAiem9uZUIiLCAiYWNjZXNzIjogImdyYW50ZWQiLCAidXNlciI6ICJ1c2VyMyIgfQogICAgXQogIH0KfQo=
+          policy:
+            zone : cGFja2FnZSB6b25lCgppbXBvcnQgcmVnby52MQoKZGVmYXVsdCBhbGxvdyA6PSBmYWxzZQoKYWxsb3cgaWYgewogICAgaGFzX3pvbmVfYWNjZXNzCiAgICBhY3Rpb25faXNfbG9nX3ZpZXcKfQoKYWN0aW9uX2lzX2xvZ192aWV3IGlmIHsKICAgICJ2aWV3IiBpbiBpbnB1dC5hY3Rpb25zCn0KCmhhc196b25lX2FjY2VzcyBjb250YWlucyBhY2Nlc3NfZGF0YSBpZiB7CiAgICBzb21lIHpvbmVfZGF0YSBpbiBkYXRhLnpvbmUuem9uZS56b25lX2FjY2Vzc19sb2dzCiAgICB6b25lX2RhdGEudGltZXN0YW1wID49IGlucHV0LnRpbWVfcGVyaW9kLmZyb20KICAgIHpvbmVfZGF0YS50aW1lc3RhbXAgPCBpbnB1dC50aW1lX3BlcmlvZC50bwogICAgem9uZV9kYXRhLnpvbmVfaWQgPT0gaW5wdXQuem9uZV9pZAogICAgYWNjZXNzX2RhdGEgOj0ge2RhdGF0eXBlOiB6b25lX2RhdGFbZGF0YXR5cGVdIHwgZGF0YXR5cGUgaW4gaW5wdXQuZGF0YXR5cGVzfQp9Cg==
+        name: zone
+        version: 1.0.0
+        metadata:
+          policy-id: zone
+          policy-version: 1.0.0
diff --git a/test/test_resources/undeploy_batch_delete.json b/test/test_resources/undeploy_batch_delete.json
new file mode 100644
index 0000000..9195f74
--- /dev/null
+++ b/test/test_resources/undeploy_batch_delete.json
@@ -0,0 +1,23 @@
+    "groups": [
+        {
+            "name": "opaGroup",
+            "deploymentSubgroups": [
+                {
+                    "pdpType": "opa",
+                    "action": "DELETE",
+                    "policies": [
+                        {
+                            "name": "account",
+                            "version": "1.0.0"
+                        },
+                        {
+                            "name": "organization",
+                            "version": "1.0.0"
+                        }
+                    ]
+                }
+            ]
+        }
+    ]