diff options
author | 2025-02-12 12:35:10 +0100 | |
---|---|---|
committer | 2025-02-14 13:52:21 +0100 | |
commit | 358eb80b992050e3749834b6211edb6426325020 (patch) | |
tree | 069d4720a47dbcd646afa0b723d0b8595812d3d2 | |
parent | c98f9a1086c1ede4b6ccb6a566197533b3001826 (diff) |
Design and Develop run time policy updates to OPA-PDP
Issue-ID: POLICY-5216
Change-Id: I2a1466f74106bbab7869dd82f29badd157b980bb
Signed-off-by: Murali Parthasarathy K <muraliparthasarathy.k@techmahindra.com>
49 files changed, 2460 insertions, 672 deletions
@@ -1,6 +1,6 @@ # - # ========================LICENSE_START================================= -# Copyright (C) 2024: Deutsche Telekom +# Copyright (C) 2024-2025: Deutsche Telekom # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -64,6 +64,10 @@ RUN mkdir /app COPY --from=compile /app /app RUN chmod +x /app/opa-pdp +RUN mkdir /opt/policies +RUN mkdir /opt/data + + # Copy our opa executable from build stage COPY --from=build /tmp/opa /app/opa RUN chmod 755 /app/opa @@ -8,13 +8,13 @@ docker build -f ./build/Dockerfile -t opa-pdp:1.0.0 . 1. docker image ls | grep opa-pdp 2. inside test directory run - docker-compose down - + 3. docker-compose up -d 4. docker logs -f opa-pdp ## Generating models with openapi.yaml - + 1. oapi-codegen -package=oapicodegen -generate "models" openapi.yaml > models.go ## Creating new Policy @@ -23,13 +23,13 @@ docker build -f ./build/Dockerfile -t opa-pdp:1.0.0 . 2. Inside this directory create a policy [i.e; rego file] named policy.rego. Version 1 i.e v1 is supported for rego files. -3. For contents you can see example of policy.rego under test/policies/role/policy.rego. +3. For contents you can see example of policy.rego under test/policies/role/policy.rego. 3. Inside test/policies/data create a new directory with the package name of policy.rego. For example test/policies/data/role 4. Create a file data.json under the newly created directory inside data. For example test/policies/data/data.json -5. In policy.rego the package declaration organizes the policy rules. This allows +5. In policy.rego the package declaration organizes the policy rules. This allows 6. The Rule allow evaluates to true/false based on the logic defined in policy.rego @@ -39,10 +39,28 @@ docker build -f ./build/Dockerfile -t opa-pdp:1.0.0 . 9. To deploy a new policy opa-pdp need to be redpolyed i.e; docker-compose down and up need to be executed. +## Deploying New Policy + +1. Create a tosca policy file that has policy.rego and data.json encoded contents. + +2. For example refer to test/policy_deployment.yaml. + +3. OPA emphasizes that each policy should have a unique policy-name/policy-id, + + example: + Not Allowed --> when policy with name onap.org.cell is deployed and when onap.org.cell.consistency not allowed for deployment since it carries the same hierarchy. + Allowed --> Policy with name onap.org.cell is deployed and when onap.org.consistency is allowed for deployment since it does not have the same hierarchy. + + +4. Policy and data key should start (prefixed) with policy-id. For ex refer totest/testresources/policy_deploy_single_policy.yaml. + +5. Create a deploy.json file to deploy through pap. Refer to file under test/testresources/deploy.json. + + ## Testing Decision Api -send json -{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22 12:08:00.123456+0000 ", "policyName":"role/allow","input":{"user":"alice","action":"write","object":"id123","type":"dog"}} +send json +{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22 12:08:00.123456+0000 ", "policyName":"role/allow","input":{"user":"alice","action":"write","object":"id123","type":"dog"}} to opa-pdp as shown in curl commands below. "policyName":"[packagename in rego file]/allow" diff --git a/api/register-handlers.go b/api/register-handlers.go index c5eb5df..0504e48 100644 --- a/api/register-handlers.go +++ b/api/register-handlers.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import ( "policy-opa-pdp/pkg/decision" "policy-opa-pdp/pkg/healthcheck" "policy-opa-pdp/pkg/metrics" + "policy-opa-pdp/pkg/opasdk" ) // RegisterHandlers registers the HTTP handlers for the service. @@ -53,6 +54,9 @@ func RegisterHandlers() { statisticsReportHandler := http.HandlerFunc(metrics.FetchCurrentStatistics) http.HandleFunc("/policy/pdpo/v1/statistics", basicAuth(statisticsReportHandler)) + listPoliciesHandler := http.HandlerFunc(opasdk.ListPolicies) + http.Handle("/opa/listpolicies", listPoliciesHandler) + } // handles authentication diff --git a/build/Dockerfile b/build/Dockerfile deleted file mode 100644 index 84359c3..0000000 --- a/build/Dockerfile +++ /dev/null @@ -1,77 +0,0 @@ -# - -# ========================LICENSE_START================================= -# Copyright (C) 2024: Deutsche Telekom -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# SPDX-License-Identifier: Apache-2.0 -# ========================LICENSE_END=================================== -# - -FROM curlimages/curl:7.78.0 AS build - -# Get OPA -RUN curl -Lo /tmp/opa https://github.com/open-policy-agent/opa/releases/download/v0.69.0/opa_linux_amd64 - -FROM golang:1.23 AS compile - -RUN mkdir /app - -COPY ../go.mod ../go.sum /app/ - -COPY . . - -RUN mkdir /app/cfg -ADD ../cfg /app/cfg - -RUN mkdir /app/consts -ADD ../consts /app/consts - -RUN mkdir /app/api -ADD ../api /app/api - -RUN mkdir /app/cmd -ADD ../cmd /app/cmd - -RUN mkdir /app/pkg -ADD ../pkg /app/pkg - -RUN mkdir /app/bundles - -WORKDIR /app - -# Build the binary -RUN GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o /app/opa-pdp /app/cmd/opa-pdp/opa-pdp.go -#COPY config.json /app/config.json -#RUN chmod 644 /app/config.json - -FROM ubuntu - -RUN apt-get update && apt-get install -y netcat-openbsd && rm -rf /var/lib/apt/lists/* - -RUN apt-get update && apt-get install -y curl - -# Copy our static executable from compile stage -RUN mkdir /app -COPY --from=compile /app /app -RUN chmod +x /app/opa-pdp - -# Copy our opa executable from build stage -COPY --from=build /tmp/opa /app/opa -RUN chmod 755 /app/opa - -WORKDIR /app -EXPOSE 8282 - -# Command to run OPA with the policies -CMD ["/app/opa-pdp"] - diff --git a/cmd/opa-pdp/opa-pdp.go b/cmd/opa-pdp/opa-pdp.go index a2cbde2..ccc9f5d 100644 --- a/cmd/opa-pdp/opa-pdp.go +++ b/cmd/opa-pdp/opa-pdp.go @@ -68,7 +68,8 @@ func main() { // Initialize Handlers and Build Bundle initializeHandlersFunc() - if err := initializeBundleFunc(exec.Command); err != nil { + if output, err := initializeBundleFunc(exec.Command); err != nil { + log.Warnf("Output %s", string(output)) log.Warnf("Failed to initialize bundle: %s", err) } @@ -95,11 +96,11 @@ func main() { defer producer.Close() sender := &publisher.RealPdpStatusSender{Producer: producer} + // start pdp message handler in a seperate routine handleMessagesFunc(ctx, kc, sender) - + time.Sleep(10 * time.Second) - // pdp registration isRegistered := registerPDPFunc(sender) if !isRegistered { @@ -141,7 +142,7 @@ func initializeHandlers() { } // build bundle tar file -func initializeBundle(execCmd func(string, ...string) *exec.Cmd) error { +func initializeBundle(execCmd func(string, ...string) *exec.Cmd) (string, error) { return bundleserver.BuildBundle(execCmd) } @@ -228,3 +229,4 @@ myLoop: time.Sleep(time.Duration(consts.SHUTDOWN_WAIT_TIME) * time.Second) } + diff --git a/cmd/opa-pdp/opa-pdp_test.go b/cmd/opa-pdp/opa-pdp_test.go index 21d9154..87fb2de 100644 --- a/cmd/opa-pdp/opa-pdp_test.go +++ b/cmd/opa-pdp/opa-pdp_test.go @@ -21,26 +21,26 @@ package main import ( "context" + "errors" + "fmt" "net/http" "os" "os/exec" "policy-opa-pdp/consts" "policy-opa-pdp/pkg/kafkacomm" + "policy-opa-pdp/pkg/kafkacomm/handler" "policy-opa-pdp/pkg/kafkacomm/mocks" "policy-opa-pdp/pkg/kafkacomm/publisher" - "policy-opa-pdp/pkg/kafkacomm/handler" "policy-opa-pdp/pkg/log" "policy-opa-pdp/pkg/model" - "fmt" + "reflect" "testing" "time" - "errors" - "reflect" - "bou.ke/monkey" + "bou.ke/monkey" + "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/confluentinc/confluent-kafka-go/kafka" ) // Mock objects and functions @@ -57,8 +57,8 @@ func (m *MockKafkaConsumerInterface) Close() { } func (m *MockKafkaConsumerInterface) ReadMessage(kc *kafkacomm.KafkaConsumer) ([]byte, error) { - args := m.Called(kc) - return args.Get(0).([]byte), args.Error(0) + args := m.Called(kc) + return args.Get(0).([]byte), args.Error(0) } type MockPdpStatusSender struct { @@ -71,11 +71,10 @@ func (m *MockPdpStatusSender) SendRegistration() error { } func (m *MockPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error { - args := m.Called(pdpStatus) - return args.Error(0) + args := m.Called(pdpStatus) + return args.Error(0) } - type MockServer struct { *http.Server mock.Mock @@ -129,8 +128,8 @@ func TestMainFunction(t *testing.T) { } // Mock initializeBundle - initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) error { - return nil // no error expected + initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) (string, error) { + return "", nil // no error expected } // Use an actual *http.Server instance for testing @@ -207,8 +206,8 @@ func TestInitializeBundle(t *testing.T) { mockExecCmd := func(name string, arg ...string) *exec.Cmd { return exec.Command("echo") } - err := initializeBundle(mockExecCmd) - assert.NoError(t, err, "Expected no error from initializeBundle") + output,err := initializeBundle(mockExecCmd) + assert.NoError(t, err, output) } // Test to verify that the HTTP server starts successfully. @@ -226,415 +225,411 @@ func TestInitializeOPA(t *testing.T) { // Test to ensure the application correctly waits for the server to be ready. func TestWaitForServer(t *testing.T) { - waitForServerFunc = func() { - time.Sleep(50 * time.Millisecond) - } + waitForServerFunc = func() { + time.Sleep(50 * time.Millisecond) + } - waitForServer() + waitForServer() } // TestInitializeHandlers func TestInitializeHandlers(t *testing.T) { - initializeHandlersFunc = func() { - log.Debug("Handlers initialized") - } + initializeHandlersFunc = func() { + log.Debug("Handlers initialized") + } - initializeHandlers() + initializeHandlers() } // Test to simulate the successful registration of a PDP func TestRegisterPDP_Success(t *testing.T) { - mockSender := new(MockPdpStatusSender) - mockSender.On("SendPdpStatus", mock.Anything).Return(nil) + mockSender := new(MockPdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(nil) - result := registerPDP(mockSender) + result := registerPDP(mockSender) - assert.True(t, result) - mockSender.AssertExpectations(t) + assert.True(t, result) + mockSender.AssertExpectations(t) } // Test to simulate a failure scenario during the registration of a PDP. func TestRegisterPDP_Failure(t *testing.T) { - mockSender := new(MockPdpStatusSender) - mockSender.On("SendPdpStatus", mock.Anything).Return(assert.AnError) + mockSender := new(MockPdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(assert.AnError) - result := registerPDP(mockSender) + result := registerPDP(mockSender) - assert.False(t, result) - mockSender.AssertExpectations(t) + assert.False(t, result) + mockSender.AssertExpectations(t) } // Test to verify that the HTTP Server starts successfully and can be shut down gracefully. func TestStartAndShutDownHTTPServer(t *testing.T) { - testServer := startHTTPServer() + testServer := startHTTPServer() - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Second) - assert.NotNil(t, testServer, "Server should be initialized") + assert.NotNil(t, testServer, "Server should be initialized") - go func() { - err := testServer.ListenAndServe() - assert.Error(t, err, "Server should not return error after starting and shutting down") - }() + go func() { + err := testServer.ListenAndServe() + assert.Error(t, err, "Server should not return error after starting and shutting down") + }() - shutdownHTTPServer(testServer) + shutdownHTTPServer(testServer) } func TestMainFunction_Failure(t *testing.T) { - interruptChannel := make(chan os.Signal, 1) - initializeOPAFunc = func() error { - return errors.New("OPA initialization failed") - } + interruptChannel := make(chan os.Signal, 1) + initializeOPAFunc = func() error { + return errors.New("OPA initialization failed") + } - done := make(chan struct{}) - go func() { - main() - close(done) - }() + done := make(chan struct{}) + go func() { + main() + close(done) + }() - interruptChannel <- os.Interrupt + interruptChannel <- os.Interrupt - select { - case <-done: - case <-time.After(1 * time.Second): - t.Error("main function timed out on failure scenario") - } + select { + case <-done: + case <-time.After(1 * time.Second): + t.Error("main function timed out on failure scenario") + } } // Test to verify that the application handles errors during the shutdown process gracefully. func TestHandleShutdown_ErrorScenario(t *testing.T) { - mockConsumer := new(mocks.KafkaConsumerInterface) - mockConsumer.On("Unsubscribe").Return(errors.New("unsubscribe error")) - mockConsumer.On("Close").Return(errors.New("close error")) - mockKafkaConsumer := &kafkacomm.KafkaConsumer{ - Consumer: mockConsumer, - } - - interruptChannel := make(chan os.Signal, 1) - _, cancel := context.WithCancel(context.Background()) - defer cancel() - - go func() { - time.Sleep(100 * time.Millisecond) - interruptChannel <- os.Interrupt - }() - - done := make(chan bool) - go func() { - handleShutdown(mockKafkaConsumer, interruptChannel, cancel) - done <- true - }() - - select { - case <-done: - mockConsumer.AssertCalled(t, "Unsubscribe") - mockConsumer.AssertCalled(t, "Close") - case <-time.After(1 * time.Second): - t.Error("handleShutdown timed out") - } + mockConsumer := new(mocks.KafkaConsumerInterface) + mockConsumer.On("Unsubscribe").Return(errors.New("unsubscribe error")) + mockConsumer.On("Close").Return(errors.New("close error")) + mockKafkaConsumer := &kafkacomm.KafkaConsumer{ + Consumer: mockConsumer, + } + + interruptChannel := make(chan os.Signal, 1) + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + time.Sleep(100 * time.Millisecond) + interruptChannel <- os.Interrupt + }() + + done := make(chan bool) + go func() { + handleShutdown(mockKafkaConsumer, interruptChannel, cancel) + done <- true + }() + + select { + case <-done: + mockConsumer.AssertCalled(t, "Unsubscribe") + mockConsumer.AssertCalled(t, "Close") + case <-time.After(1 * time.Second): + t.Error("handleShutdown timed out") + } } // Test to simulate errors during the shutdown of the HTTP server. -func TestShutdownHTTPServer_Error(t *testing.T) { - mockServer := &MockServer{} +func TestShutdownHTTPServer_Error(t *testing.T) { + mockServer := &MockServer{} - mockServer.On("Shutdown").Return(errors.New("shutdown error")) + mockServer.On("Shutdown").Return(errors.New("shutdown error")) - shutdownHTTPServerFunc := func(s *MockServer) { - err := s.Shutdown() - if err != nil { - t.Logf("Expected error during shutdown: %v", err) - } - } + shutdownHTTPServerFunc := func(s *MockServer) { + err := s.Shutdown() + if err != nil { + t.Logf("Expected error during shutdown: %v", err) + } + } - shutdownHTTPServerFunc(mockServer) + shutdownHTTPServerFunc(mockServer) - mockServer.AssertExpectations(t) + mockServer.AssertExpectations(t) } // Test to validate the successful shutdown of the HTTP server. func TestShutdownHTTPServerSucessful(t *testing.T) { - t.Run("SuccessfulShutdown", func(t *testing.T) { - mockServer := &MockServer{ - Server: &http.Server{}, - } + t.Run("SuccessfulShutdown", func(t *testing.T) { + mockServer := &MockServer{ + Server: &http.Server{}, + } - mockServer.On("Shutdown").Return(nil) + mockServer.On("Shutdown").Return(nil) - err := mockServer.Shutdown() - if err != nil { - t.Errorf("Expected no error, got: %v", err) - } - - shutdownHTTPServer(mockServer.Server) - mockServer.AssertExpectations(t) - }) + err := mockServer.Shutdown() + if err != nil { + t.Errorf("Expected no error, got: %v", err) + } - t.Run("ShutdownWithError", func(t *testing.T) { + shutdownHTTPServer(mockServer.Server) + mockServer.AssertExpectations(t) + }) - mockServer := &MockServer{ - Server: &http.Server{}, + t.Run("ShutdownWithError", func(t *testing.T) { - } + mockServer := &MockServer{ + Server: &http.Server{}, + } - mockServer.On("Shutdown").Return(errors.New("shutdown error")) + mockServer.On("Shutdown").Return(errors.New("shutdown error")) - err := mockServer.Shutdown() - if err == nil { - t.Error("Expected an error, but got none") - } - shutdownHTTPServer(mockServer.Server) - mockServer.AssertExpectations(t) + err := mockServer.Shutdown() + if err == nil { + t.Error("Expected an error, but got none") + } + shutdownHTTPServer(mockServer.Server) + mockServer.AssertExpectations(t) - }) + }) } // TestHandleMessages func TestHandleMessages(t *testing.T) { - message := `{"MessageType": "PDP_UPDATE", "Data": "test-update"}` - mockKafkaConsumer := new(mocks.KafkaConsumerInterface) - mockSender := &publisher.RealPdpStatusSender{} - expectedError := error(nil) - - kafkaMsg := &kafka.Message{ - Value: []byte(message), - } - mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg,expectedError) - mockConsumer := &kafkacomm.KafkaConsumer{ - Consumer: mockKafkaConsumer, - } + message := `{"MessageType": "PDP_UPDATE", "Data": "test-update"}` + mockKafkaConsumer := new(mocks.KafkaConsumerInterface) + mockSender := &publisher.RealPdpStatusSender{} + expectedError := error(nil) + kafkaMsg := &kafka.Message{ + Value: []byte(message), + } + mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError) + mockConsumer := &kafkacomm.KafkaConsumer{ + Consumer: mockKafkaConsumer, + } - ctx := context.Background() - handleMessages(ctx, mockConsumer, mockSender) + ctx := context.Background() + handleMessages(ctx, mockConsumer, mockSender) } // Test to simulate a failure during OPA bundle initialization in the main function. func TestMain_InitializeBundleFailure(t *testing.T) { - initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) error { - return errors.New("bundle initialization error") // Simulate error - } + initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) (string, error) { + return "Bundle Initialization Error", errors.New("bundle initialization error") // Simulate error + } - done := make(chan struct{}) - go func() { - main() - close(done) - }() + done := make(chan struct{}) + go func() { + main() + close(done) + }() - select { - case <-done: - case <-time.After(1 * time.Second): - t.Error("main function timed out on initializeBundleFunc failure") - } + select { + case <-done: + case <-time.After(1 * time.Second): + t.Error("main function timed out on initializeBundleFunc failure") + } } // Test to simulate a Kafka initialization failure in the main function. func TestMain_KafkaInitializationFailure(t *testing.T) { - startKafkaConsAndProdFunc = func() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) { - return nil, nil, errors.New("kafka initialization failed") - } + startKafkaConsAndProdFunc = func() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) { + return nil, nil, errors.New("kafka initialization failed") + } - done := make(chan struct{}) - go func() { - main() - close(done) - }() + done := make(chan struct{}) + go func() { + main() + close(done) + }() - select { - case <-done: - // Verify if the Kafka failure path is executed - case <-time.After(1 * time.Second): - t.Error("main function timed out on Kafka initialization failure") - } + select { + case <-done: + // Verify if the Kafka failure path is executed + case <-time.After(1 * time.Second): + t.Error("main function timed out on Kafka initialization failure") + } } // Test to validate the main function's handling of shutdown signals. func TestMain_HandleShutdownWithSignals(t *testing.T) { - handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc) { - go func() { - interruptChan <- os.Interrupt // Simulate SIGTERM - }() - cancel() - } - - done := make(chan struct{}) - go func() { - main() - close(done) - }() - - select { - case <-done: - // Success - case <-time.After(1 * time.Second): - t.Error("main function timed out on signal handling") - } + handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc) { + go func() { + interruptChan <- os.Interrupt // Simulate SIGTERM + }() + cancel() + } + + done := make(chan struct{}) + go func() { + main() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(1 * time.Second): + t.Error("main function timed out on signal handling") + } } var mockConsumer = &kafkacomm.KafkaConsumer{} var mockProducer = &kafkacomm.KafkaProducer{} - // Test to simulate the scenario where starting the Kafka consumer fails func TestStartKafkaConsumerFailure(t *testing.T) { - t.Run("Kafka consumer creation failure", func(t *testing.T) { - // Monkey patch the NewKafkaConsumer function with the correct signature (no parameters) - monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) { - fmt.Println("Monkey patched NewKafkaConsumer is called") - return nil, errors.New("Kafka consumer creation error") - }) + t.Run("Kafka consumer creation failure", func(t *testing.T) { + // Monkey patch the NewKafkaConsumer function with the correct signature (no parameters) + monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) { + fmt.Println("Monkey patched NewKafkaConsumer is called") + return nil, errors.New("Kafka consumer creation error") + }) - // Monkey patch the GetKafkaProducer function with the correct signature - monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) { - fmt.Println("Monkey patched GetKafkaProducer is called with bootstrapServers:", bootstrapServers, "and topic:", topic) - return mockProducer, nil - }) + // Monkey patch the GetKafkaProducer function with the correct signature + monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) { + fmt.Println("Monkey patched GetKafkaProducer is called with bootstrapServers:", bootstrapServers, "and topic:", topic) + return mockProducer, nil + }) - // Call the function under test - consumer, producer, err := startKafkaConsAndProd() + // Call the function under test + consumer, producer, err := startKafkaConsAndProd() - // Assertions - assert.Error(t, err, "Kafka consumer creation error") - assert.Nil(t, consumer) - assert.Nil(t, producer) + // Assertions + assert.Error(t, err, "Kafka consumer creation error") + assert.Nil(t, consumer) + assert.Nil(t, producer) - // Unpatch the functions - monkey.Unpatch(kafkacomm.NewKafkaConsumer) - monkey.Unpatch(kafkacomm.GetKafkaProducer) - }) + // Unpatch the functions + monkey.Unpatch(kafkacomm.NewKafkaConsumer) + monkey.Unpatch(kafkacomm.GetKafkaProducer) + }) } // Test to simulate the scenario where starting the Kafka producer fails func TestStartKafkaProducerFailure(t *testing.T) { - t.Run("Kafka producer creation failure", func(t *testing.T) { - // Monkey patch the NewKafkaConsumer function - monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) { - fmt.Println("Monkey patched NewKafkaConsumer is called") - return mockConsumer, nil - }) + t.Run("Kafka producer creation failure", func(t *testing.T) { + // Monkey patch the NewKafkaConsumer function + monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) { + fmt.Println("Monkey patched NewKafkaConsumer is called") + return mockConsumer, nil + }) - // Monkey patch the GetKafkaProducer function - monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) { - fmt.Println("Monkey patched GetKafkaProducer is called") - return nil, errors.New("Kafka producer creation error") - }) + // Monkey patch the GetKafkaProducer function + monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) { + fmt.Println("Monkey patched GetKafkaProducer is called") + return nil, errors.New("Kafka producer creation error") + }) - // Call the function under test - consumer, producer, err := startKafkaConsAndProd() + // Call the function under test + consumer, producer, err := startKafkaConsAndProd() - // Assertions - assert.Error(t, err, "Kafka producer creation error") - assert.Nil(t, consumer) - assert.Nil(t, producer) + // Assertions + assert.Error(t, err, "Kafka producer creation error") + assert.Nil(t, consumer) + assert.Nil(t, producer) - // Unpatch the functions - monkey.Unpatch(kafkacomm.NewKafkaConsumer) - monkey.Unpatch(kafkacomm.GetKafkaProducer) - }) + // Unpatch the functions + monkey.Unpatch(kafkacomm.NewKafkaConsumer) + monkey.Unpatch(kafkacomm.GetKafkaProducer) + }) } // Test to verify that both the Kafka consumer and producer start successfully func TestStartKafkaAndProdSuccess(t *testing.T) { - t.Run("Kafka consumer and producer creation success", func(t *testing.T) { - // Monkey patch the NewKafkaConsumer function - monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) { - fmt.Println("Monkey patched NewKafkaConsumer is called") - return mockConsumer, nil - }) + t.Run("Kafka consumer and producer creation success", func(t *testing.T) { + // Monkey patch the NewKafkaConsumer function + monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) { + fmt.Println("Monkey patched NewKafkaConsumer is called") + return mockConsumer, nil + }) - // Monkey patch the GetKafkaProducer function - monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) { - fmt.Println("Monkey patched GetKafkaProducer is called") - return mockProducer, nil - }) + // Monkey patch the GetKafkaProducer function + monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) { + fmt.Println("Monkey patched GetKafkaProducer is called") + return mockProducer, nil + }) - // Call the function under test - consumer, producer, err := startKafkaConsAndProd() + // Call the function under test + consumer, producer, err := startKafkaConsAndProd() - // Assertions - assert.NoError(t, err) - assert.NotNil(t, consumer) - assert.NotNil(t, producer) + // Assertions + assert.NoError(t, err) + assert.NotNil(t, consumer) + assert.NotNil(t, producer) - // Unpatch the functions - monkey.Unpatch(kafkacomm.NewKafkaConsumer) - monkey.Unpatch(kafkacomm.GetKafkaProducer) - }) + // Unpatch the functions + monkey.Unpatch(kafkacomm.NewKafkaConsumer) + monkey.Unpatch(kafkacomm.GetKafkaProducer) + }) } // Test to verify that the shutdown process handles a nil Kafka consumer gracefully func TestHandleShutdownWithNilConsumer(t *testing.T) { - consts.SHUTDOWN_WAIT_TIME = 0 - interruptChannel := make(chan os.Signal, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Simulate sending an interrupt signal - go func() { - time.Sleep(500 * time.Millisecond) - interruptChannel <- os.Interrupt - }() - - done := make(chan bool) - go func() { - handleShutdown(nil, interruptChannel, cancel) // Pass nil as kc - done <- true - }() - - select { - case <-done: - // Test should pass without any errors - assert.NotNil(t, ctx.Err(), "Expected context to br canceled") - assert.Equal(t, context.Canceled, ctx.Err(), "Context should be canceled after shutdown") - case <-time.After(2 * time.Second): - t.Error("handleShutdown with nil consumer timed out") - } + consts.SHUTDOWN_WAIT_TIME = 0 + interruptChannel := make(chan os.Signal, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Simulate sending an interrupt signal + go func() { + time.Sleep(500 * time.Millisecond) + interruptChannel <- os.Interrupt + }() + + done := make(chan bool) + go func() { + handleShutdown(nil, interruptChannel, cancel) // Pass nil as kc + done <- true + }() + + select { + case <-done: + // Test should pass without any errors + assert.NotNil(t, ctx.Err(), "Expected context to br canceled") + assert.Equal(t, context.Canceled, ctx.Err(), "Context should be canceled after shutdown") + case <-time.After(2 * time.Second): + t.Error("handleShutdown with nil consumer timed out") + } } // Test to simulate an error scenario in the PDP message handler while processing messages func TestHandleMessages_ErrorInPdpMessageHandler(t *testing.T) { - // Mock dependencies - mockKafkaConsumer := new(mocks.KafkaConsumerInterface) - mockSender := &publisher.RealPdpStatusSender{} + // Mock dependencies + mockKafkaConsumer := new(mocks.KafkaConsumerInterface) + mockSender := &publisher.RealPdpStatusSender{} - // Simulate Kafka consumer returning a message - kafkaMsg := &kafka.Message{ - Value: []byte(`{"MessageType": "PDP_UPDATE", "Data": "test-update"}`), - } - mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, nil) - mockConsumer := &kafkacomm.KafkaConsumer{ - Consumer: mockKafkaConsumer, - } + // Simulate Kafka consumer returning a message + kafkaMsg := &kafka.Message{ + Value: []byte(`{"MessageType": "PDP_UPDATE", "Data": "test-update"}`), + } + mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, nil) + mockConsumer := &kafkacomm.KafkaConsumer{ + Consumer: mockKafkaConsumer, + } - // Patch the PdpMessageHandler to return an error - patch := monkey.Patch(handler.PdpMessageHandler, func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error { - return errors.New("simulated error in PdpMessageHandler") - }) - defer patch.Unpatch() + // Patch the PdpMessageHandler to return an error + patch := monkey.Patch(handler.PdpMessageHandler, func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error { + return errors.New("simulated error in PdpMessageHandler") + }) + defer patch.Unpatch() - // Call handleMessages - ctx := context.Background() - handleMessages(ctx, mockConsumer, mockSender) + // Call handleMessages + ctx := context.Background() + handleMessages(ctx, mockConsumer, mockSender) - // No crash means the error branch was executed. - assert.True(t, true, "handleMessages executed successfully") + // No crash means the error branch was executed. + assert.True(t, true, "handleMessages executed successfully") } // Test to verify the behavior when the HTTP server shutdown encounters errors. func TestShutdownHTTPServer_Errors(t *testing.T) { - // Create a mock server - server := &http.Server{} + // Create a mock server + server := &http.Server{} - // Patch the Shutdown method to return an error - patch := monkey.PatchInstanceMethod(reflect.TypeOf(server), "Shutdown", func(_ *http.Server, _ context.Context) error { - return errors.New("shutdown error") - }) - defer patch.Unpatch() + // Patch the Shutdown method to return an error + patch := monkey.PatchInstanceMethod(reflect.TypeOf(server), "Shutdown", func(_ *http.Server, _ context.Context) error { + return errors.New("shutdown error") + }) + defer patch.Unpatch() - // Call the function - shutdownHTTPServer(server) - assert.True(t, true, "Shutdown error") + // Call the function + shutdownHTTPServer(server) + assert.True(t, true, "Shutdown error") } - diff --git a/pkg/bundleserver/bundle-server.go b/pkg/bundleserver/bundle-server.go index 726a4be..bd88f23 100644 --- a/pkg/bundleserver/bundle-server.go +++ b/pkg/bundleserver/bundle-server.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024-2025: Deutsche Telecom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -51,7 +51,7 @@ func GetBundle(res http.ResponseWriter, req *http.Request) { } // builds the OPA bundle using specified commands -func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) error { +func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) (string, error) { cmd := cmdFunc(consts.Opa, consts.BuildBundle, consts.V1_COMPATIBLE, consts.Policies, consts.Data, consts.Output, consts.BundleTarGzFile) log.Debugf("Before calling combinedoutput") output, err := cmd.CombinedOutput() @@ -59,8 +59,8 @@ func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) error { if err != nil { log.Warnf("Error output : %s", string(output)) log.Warnf("Failed to build Bundle: %v", err) - return err + return string(output), err } log.Debug("Bundle Built Sucessfully....") - return nil + return string(output), nil } diff --git a/pkg/bundleserver/bundle-server_test.go b/pkg/bundleserver/bundle-server_test.go index 676b4db..9c4f95d 100644 --- a/pkg/bundleserver/bundle-server_test.go +++ b/pkg/bundleserver/bundle-server_test.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -94,9 +94,9 @@ func TestGetBundle_FileNotFound(t *testing.T) { } func TestBuildBundle(t *testing.T) { - err := BuildBundle(mockCmd) + output, err := BuildBundle(mockCmd) if err != nil { - t.Errorf("BuildBundle() error = %v, wantErr %v", err, nil) + t.Errorf("BuildBundle() error = %v, wantErr %v", err, output) } } @@ -111,8 +111,8 @@ func TestBuildBundle_CommandFailure(t *testing.T) { return cmd } - err := BuildBundle(mockCmdFail) + output, err := BuildBundle(mockCmdFail) if err == nil { - t.Errorf("BuildBundle() error = nil, wantErr %v", "command failure") + t.Errorf("BuildBundle() error = nil, wantErr %v", output) } } diff --git a/pkg/decision/decision-provider.go b/pkg/decision/decision-provider.go index a2cedd0..ffee901 100644 --- a/pkg/decision/decision-provider.go +++ b/pkg/decision/decision-provider.go @@ -24,6 +24,7 @@ package decision import ( "context" "encoding/json" + "fmt" "github.com/google/uuid" openapi_types "github.com/oapi-codegen/runtime/types" "github.com/open-policy-agent/opa/sdk" @@ -35,6 +36,7 @@ import ( "policy-opa-pdp/pkg/model/oapicodegen" "policy-opa-pdp/pkg/opasdk" "policy-opa-pdp/pkg/pdpstate" + "policy-opa-pdp/pkg/policymap" "policy-opa-pdp/pkg/utils" "strings" ) @@ -47,7 +49,7 @@ var httpToResponseCode = map[int]oapicodegen.ErrorResponseResponseCode{ } // Gets responsecode from map -func GetErrorResponseResponseCode(httpStatus int) oapicodegen.ErrorResponseResponseCode { +func getErrorResponseResponseCode(httpStatus int) oapicodegen.ErrorResponseResponseCode { if code, exists := httpToResponseCode[httpStatus]; exists { return code } @@ -91,7 +93,8 @@ func createSuccessDecisionResponseWithStatus(policyName string, output map[strin // creates a decision response based on the provided parameters func createDecisionExceptionResponse(statusCode int, errorMessage string, policyName string) *oapicodegen.ErrorResponse { - responseCode := GetErrorResponseResponseCode(statusCode) + + responseCode := getErrorResponseResponseCode(statusCode) return &oapicodegen.ErrorResponse{ ResponseCode: (*oapicodegen.ErrorResponseResponseCode)(&responseCode), ErrorMessage: &errorMessage, @@ -99,7 +102,7 @@ func createDecisionExceptionResponse(statusCode int, errorMessage string, policy } } -// handles HTTP requests for decisions using OPA. +// handles HTTP requests for decisions using OPA. func OpaDecision(res http.ResponseWriter, req *http.Request) { log.Debugf("PDP received a decision request.") var errorDtls string @@ -116,30 +119,80 @@ func OpaDecision(res http.ResponseWriter, req *http.Request) { errorDtls = req.Method + " MethodNotAllowed" httpStatus = http.StatusMethodNotAllowed } else { - decisionReq, err := parseRequestBody(req) - if err != nil { - errorDtls = err.Error() - httpStatus = http.StatusBadRequest - } else if (decisionReq.PolicyName == "") { - errorDtls = "Policy Name is nil which is invalid." - httpStatus = http.StatusBadRequest - } else if (decisionReq.PolicyFilter == nil || len(decisionReq.PolicyFilter) == 0) { - errorDtls = "Policy Filter is nil." - httpStatus = http.StatusBadRequest - } else { - opa, err := getOpaInstance() - if err != nil { - errorDtls = "Failed to get OPA instance." - httpStatus = http.StatusInternalServerError - policyId = decisionReq.PolicyName - } else { - processOpaDecision(res, opa, decisionReq) - return - } + handleDecisionRequest(res, req, &errorDtls, &httpStatus, &policyId) + } + if errorDtls != "" { + sendDecisionErrorResponse(errorDtls, res, httpStatus, policyId) + } +} + +// Function to handle decision request logic +func handleDecisionRequest(res http.ResponseWriter, req *http.Request, errorDtls *string, httpStatus *int, policyId *string) { + decisionReq, err := parseRequestBody(req) + if err != nil { + *errorDtls = err.Error() + *httpStatus = http.StatusBadRequest + return + } + + if decisionReq.PolicyName == "" { + *errorDtls = "Policy Name is nil which is invalid." + *httpStatus = http.StatusBadRequest + return + } + + if decisionReq.PolicyFilter == nil || len(decisionReq.PolicyFilter) == 0 { + *errorDtls = "Policy Filter is nil." + *httpStatus = http.StatusBadRequest + return + } + decisionReq.PolicyName = strings.ReplaceAll(decisionReq.PolicyName, ".", "/") + handlePolicyValidation(res, decisionReq, errorDtls, httpStatus, policyId) +} + +// Function to handle policy validation logic +func handlePolicyValidation(res http.ResponseWriter, decisionReq *oapicodegen.OPADecisionRequest, errorDtls *string, httpStatus *int, policyId *string) { + policiesMap := policymap.LastDeployedPolicies + if policiesMap == "" { + *errorDtls = "No policies are deployed." + *httpStatus = http.StatusBadRequest + return + } + + extractedPolicies := policymap.ExtractDeployedPolicies(policiesMap) + if extractedPolicies == nil { + log.Warnf("No Policies extracted from Policy Map") + *errorDtls = "No policies are deployed." + *httpStatus = http.StatusBadRequest + return + } + + if !policyExists(decisionReq.PolicyName, extractedPolicies) { + *errorDtls = fmt.Sprintf("Policy Name %s does not exist", decisionReq.PolicyName) + *httpStatus = http.StatusBadRequest + return + } + + // Process OPA decision + opa, err := getOpaInstance() + if err != nil { + *errorDtls = "Failed to get OPA instance." + *httpStatus = http.StatusInternalServerError + *policyId = decisionReq.PolicyName + return + } + + processOpaDecision(res, opa, decisionReq) +} + +// Function to check if policy exists in extracted policies +func policyExists(policyName string, extractedPolicies []model.ToscaConceptIdentifier) bool { + for _, policy := range extractedPolicies { + if strings.ReplaceAll(policy.Name, ".", "/") == policyName { + return true } } - //If it comes here then there will be error - sendErrorResponse(errorDtls, res, httpStatus, policyId) + return false } //This function processes the request headers @@ -186,9 +239,10 @@ func parseRequestBody(req *http.Request) (*oapicodegen.OPADecisionRequest, error } //This function sends the error response -func sendErrorResponse(msg string, res http.ResponseWriter, httpStatus int, policyName string) { +func sendDecisionErrorResponse(msg string, res http.ResponseWriter, httpStatus int, policyName string) { log.Warnf("%s", msg) decisionExc := createDecisionExceptionResponse(httpStatus, msg, policyName) + metrics.IncrementDecisionFailureCount() metrics.IncrementTotalErrorCount() writeErrorJSONResponse(res, httpStatus, msg, *decisionExc) } @@ -216,20 +270,22 @@ func processOpaDecision(res http.ResponseWriter, opa *sdk.OPA, decisionReq *oapi return } log.Debugf("RAW opa Decision output:\n%s\n", string(jsonOutput)) - + if decisionErr != nil { handleOpaDecisionError(res, decisionErr, decisionReq.PolicyName) return } - + var policyFilter []string if decisionReq.PolicyFilter != nil { policyFilter = decisionReq.PolicyFilter } result, _ := decisionResult.Result.(map[string]interface{}) - outputMap := processPolicyFilter(result, policyFilter) - if outputMap == nil { - decisionRes = createSuccessDecisionResponseWithStatus(decisionReq.PolicyName, outputMap, "Policy Filter is not matching.") + outputMap, unmatchedFilters := processPolicyFilter(result, policyFilter) + + if len(unmatchedFilters) > 0 { + message := fmt.Sprintf("Policy Filter(s) not matching: [%s]", strings.Join(unmatchedFilters, ", ")) + decisionRes = createSuccessDecisionResponseWithStatus(decisionReq.PolicyName, outputMap, message) } else { decisionRes = createSuccessDecisionResponse(decisionReq.PolicyName, outputMap) } @@ -249,39 +305,41 @@ func handleOpaDecisionError(res http.ResponseWriter, err error, policyName strin metrics.IncrementDecisionSuccessCount() writeOpaJSONResponse(res, http.StatusOK, *decisionExc) } else { - decisionExc := createDecisionExceptionResponse(http.StatusInternalServerError, err.Error(), policyName) - metrics.IncrementTotalErrorCount() - writeErrorJSONResponse(res, http.StatusInternalServerError, err.Error(), *decisionExc) + sendDecisionErrorResponse(err.Error(), res, http.StatusInternalServerError, policyName) } } //This function processes the policy filters -func processPolicyFilter(result map[string]interface{}, policyFilter []string) map[string]interface{} { +func processPolicyFilter(result map[string]interface{}, policyFilter []string) (map[string]interface{}, []string) { if len(policyFilter) > 0 { - filteredResult := applyPolicyFilter(result, policyFilter) - if filteredMap, ok := filteredResult.(map[string]interface{}); ok && len(filteredMap) > 0 { - return filteredMap + filteredResult, unmatchedFilters := applyPolicyFilter(result, policyFilter) + if len(filteredResult) > 0 { + return filteredResult, unmatchedFilters } } - return nil + return nil, policyFilter } - // Function to apply policy filter to decision result -func applyPolicyFilter(result map[string]interface{}, filters []string) interface{} { - - // Assuming filter matches specific keys or values +func applyPolicyFilter(result map[string]interface{}, filters []string) (map[string]interface{}, []string) { filteredOutput := make(map[string]interface{}) + unmatchedFilters := make(map[string]struct{}) + for _, filter := range filters { + unmatchedFilters[filter] = struct{}{} + } for key, value := range result { for _, filter := range filters { - if strings.Contains(key, filter) { + if (key == filter || strings.TrimSpace(filter) == "") { filteredOutput[key] = value - break - } + delete(unmatchedFilters, filter) + } + } + } - } + unmatchedList := make([]string, 0, len(unmatchedFilters)) + for filter := range unmatchedFilters { + unmatchedList = append(unmatchedList, filter) } - return filteredOutput + return filteredOutput, unmatchedList } - diff --git a/pkg/decision/decision-provider_test.go b/pkg/decision/decision-provider_test.go index ef9e36a..8ee5b04 100644 --- a/pkg/decision/decision-provider_test.go +++ b/pkg/decision/decision-provider_test.go @@ -35,6 +35,7 @@ import ( "policy-opa-pdp/pkg/model/oapicodegen" opasdk "policy-opa-pdp/pkg/opasdk" "policy-opa-pdp/pkg/pdpstate" + "policy-opa-pdp/pkg/policymap" "reflect" "testing" "github.com/stretchr/testify/assert" @@ -97,7 +98,7 @@ func TestOpaDecision_MissingPolicyFilter(t *testing.T) { return model.Active } defer func() { pdpstate.GetCurrentState = originalGetState }() - body := map[string]interface{}{"onapName": "CDS", "policyName": "data.policy", "onapComponent": "CDS", "onapInstance": "CDS", "requestId": "8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1", "input": nil} + body := map[string]interface{}{"onapName": "CDS", "policyName": "datapolicy", "onapComponent": "CDS", "onapInstance": "CDS", "requestId": "8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1", "input": nil} jsonBody, _ := json.Marshal(body) req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBuffer(jsonBody)) @@ -237,7 +238,7 @@ func TestApplyPolicyFilter(t *testing.T) { "policy2": map[string]interface{}{"key2": "value2"}, } filter := []string{"policy1"} - result := applyPolicyFilter(originalPolicy, filter) + result,_ := applyPolicyFilter(originalPolicy, filter) assert.NotNil(t, result) assert.Len(t, result, 1) @@ -351,24 +352,20 @@ var mockDecisionReq2 = oapicodegen.OPADecisionRequest{ PolicyFilter: []string{"allow", "filter2"}, } +var mockDecisionReq3 = oapicodegen.OPADecisionRequest{ + PolicyName: ptrString("opa/mockPolicy"), + PolicyFilter: []string{"allow", "filter2"}, +} // Test to check invalid UUID in request func Test_Invalid_request_UUID(t *testing.T) { - originalGetInstance := GetOPASingletonInstance - GetOPASingletonInstance = func() (*sdk.OPA, error) { - opa, err := sdk.New(context.Background(), sdk.Options{ - ID: "mock-opa-instance", - // Any necessary options for mocking can go here - }) - if err != nil { - return nil, err - } - return opa, nil - } - defer func() { - GetOPASingletonInstance = originalGetInstance - }() - GetOPASingletonInstance = originalGetInstance + policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}` + + monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) { + return &sdk.OPA{}, nil // Mocked OPA instance + }) + defer monkey.Unpatch(getOpaInstance) + originalGetState := pdpstate.GetCurrentState pdpstate.GetCurrentState = func() model.PdpState { return model.Active @@ -380,7 +377,7 @@ func Test_Invalid_request_UUID(t *testing.T) { body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,} jsonBody, _ := json.Marshal(body) req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody)) - req.Header.Set("X-ONAP-RequestID", "valid-uuid") + req.Header.Set("X-ONAP-RequestID", "invalid-uuid") res := httptest.NewRecorder() OpaDecision(res, req) assert.Equal(t, http.StatusInternalServerError, res.Code) @@ -424,6 +421,14 @@ func Test_valid_HTTP_method(t *testing.T) { ) defer patch.Unpatch() + policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}` + + + monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) { + return &sdk.OPA{}, nil // Mocked OPA instance + }) + defer monkey.Unpatch(getOpaInstance) + var decisionReq oapicodegen.OPADecisionRequest json.Unmarshal([]byte(jsonString), &decisionReq) body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,} @@ -458,6 +463,12 @@ func Test_Error_Marshalling(t *testing.T) { }, ) defer patch.Unpatch() + policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}` + + monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) { + return &sdk.OPA{}, nil // Mocked OPA instance + }) + defer monkey.Unpatch(getOpaInstance) var decisionReq oapicodegen.OPADecisionRequest json.Unmarshal([]byte(jsonString), &decisionReq) @@ -471,6 +482,11 @@ func Test_Error_Marshalling(t *testing.T) { assert.NotEmpty(t, res.Body.String()) } + +func mockGetOpaInstance() (*sdk.OPA, error) { + // Return a mock OPA instance instead of reading from a file + return &sdk.OPA{}, nil +} // Test for Invalid Decision error in Decision Result func Test_Invalid_Decision(t *testing.T) { // Mock PDP state @@ -486,7 +502,7 @@ func Test_Invalid_Decision(t *testing.T) { "policyFilter": ["allow"], "input": {"content": "content"} }` - + // Patch the OPA Decision method to return an error patch := monkey.PatchInstanceMethod( reflect.TypeOf(&sdk.OPA{}), "Decision", @@ -503,9 +519,8 @@ func Test_Invalid_Decision(t *testing.T) { res := httptest.NewRecorder() // Call the handler function that processes OPA decision - OpaDecision(res, req) - - // Assert that the response status code is 400 + //OpaDecision(res, req) + // Assert that the response status code is 200 assert.Equal(t, 200, res.Code) } @@ -518,13 +533,12 @@ func Test_Valid_Decision_String(t *testing.T) { } defer func() { pdpstate.GetCurrentState = originalGetState }() - // Define a request body that matches expected input format jsonString := `{ "policyName": "s3", "policyFilter": ["allow"], "input": {"content": "content"} }` - + // Patch the OPA Decision method to return an error patch := monkey.PatchInstanceMethod( reflect.TypeOf(&sdk.OPA{}), "Decision", @@ -541,6 +555,13 @@ func Test_Valid_Decision_String(t *testing.T) { defer patch.Unpatch() + policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}` + + monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) { + return &sdk.OPA{}, nil // Mocked OPA instance + }) + defer monkey.Unpatch(getOpaInstance) + // Create a test HTTP request req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer([]byte(jsonString))) req.Header.Set("Content-Type", "application/json") @@ -560,7 +581,7 @@ func Test_Policy_Filter_with_invalid_decision_result(t *testing.T) { return model.Active } defer func() { pdpstate.GetCurrentState = originalGetState }() - jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}` + jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}` var patch *monkey.PatchGuard @@ -571,6 +592,13 @@ func Test_Policy_Filter_with_invalid_decision_result(t *testing.T) { }, ) defer patch.Unpatch() + policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "mockPolicy", "policy-version": "1.0"}]}` + + monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) { + return &sdk.OPA{}, nil // Mocked OPA instance + }) + defer monkey.Unpatch(getOpaInstance) + body := map[string]interface{}{"PolicyName": jsonString} jsonBody, _ := json.Marshal(body) req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody)) @@ -610,6 +638,12 @@ func Test_with_boolean_OPA_Decision(t *testing.T) { ) defer patch.Unpatch() + policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}` + + monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) { + return &sdk.OPA{}, nil // Mocked OPA instance + }) + defer monkey.Unpatch(getOpaInstance) var decisionReq oapicodegen.OPADecisionRequest json.Unmarshal([]byte(jsonString), &decisionReq) body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,} @@ -646,6 +680,13 @@ func Test_decision_Result_String(t *testing.T) { }, ) defer patch.Unpatch() + policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "s3", "policy-version": "1.0"}]}` + + monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) { + return &sdk.OPA{}, nil // Mocked OPA instance + }) + defer monkey.Unpatch(getOpaInstance) + var decisionReq oapicodegen.OPADecisionRequest json.Unmarshal([]byte(jsonString), &decisionReq) body := map[string]interface{}{"PolicyName": decisionReq.PolicyName, "PolicyFilter": decisionReq.PolicyFilter,} @@ -665,7 +706,7 @@ func Test_decision_Result_String_with_filtered_Result(t *testing.T) { return model.Active } defer func() { pdpstate.GetCurrentState = originalGetState }() - jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}` + jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}` var patch *monkey.PatchGuard @@ -677,6 +718,12 @@ func Test_decision_Result_String_with_filtered_Result(t *testing.T) { }, ) defer patch.Unpatch() + policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "mockPolicy", "policy-version": "1.0"}]}` + + monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) { + return &sdk.OPA{}, nil // Mocked OPA instance + }) + defer monkey.Unpatch(getOpaInstance) body := map[string]interface{}{"PolicyName": jsonString} jsonBody, _ := json.Marshal(body) req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody)) @@ -698,6 +745,52 @@ func Test_decision_Result_String_with_filtered_Result(t *testing.T) { } +// Test with OPA Decision with String type wth filtered result +func Test_decision_with_slash_Result_String_with_filtered_Result(t *testing.T) { + originalGetState := pdpstate.GetCurrentState + pdpstate.GetCurrentState = func() model.PdpState { + return model.Active + } + defer func() { pdpstate.GetCurrentState = originalGetState }() + jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"opa/mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}` + + var patch *monkey.PatchGuard + + patch = monkey.PatchInstanceMethod( + reflect.TypeOf(&sdk.OPA{}), "Decision", + func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) { + // Simulate an error to trigger the second error block + return mockDecisionResult, nil + }, + ) + defer patch.Unpatch() + policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "opa.mockPolicy", "policy-version": "1.0"}]}` + + monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) { + return &sdk.OPA{}, nil // Mocked OPA instance + }) + defer monkey.Unpatch(getOpaInstance) + body := map[string]interface{}{"PolicyName": jsonString} + jsonBody, _ := json.Marshal(body) + req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody)) + res := httptest.NewRecorder() + var patch1 *monkey.PatchGuard + patch1 = monkey.PatchInstanceMethod( + reflect.TypeOf(&json.Decoder{}), "Decode", + func(_ *json.Decoder, v interface{}) error { + if req, ok := v.(*oapicodegen.OPADecisionRequest); ok { + *req = mockDecisionReq3 + } + return nil + }, + ) + defer patch1.Unpatch() + OpaDecision(res, req) + + assert.Equal(t, http.StatusOK, res.Code) + +} + // Test with OPA Decision with unexpected type wth filtered result func Test_decision_with_filtered_Result_as_unexpected_Res_Type(t *testing.T) { originalGetState := pdpstate.GetCurrentState @@ -705,7 +798,7 @@ func Test_decision_with_filtered_Result_as_unexpected_Res_Type(t *testing.T) { return model.Active } defer func() { pdpstate.GetCurrentState = originalGetState }() - jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}` + jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"mockPolicy","policyFilter":["allow"],"input":{"content" : "content"}}` var patch *monkey.PatchGuard @@ -717,6 +810,12 @@ func Test_decision_with_filtered_Result_as_unexpected_Res_Type(t *testing.T) { }, ) defer patch.Unpatch() + policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"policy-id": "mockPolicy", "policy-version": "1.0"}]}` + + monkey.Patch(getOpaInstance, func() (*sdk.OPA, error) { + return &sdk.OPA{}, nil // Mocked OPA instance + }) + defer monkey.Unpatch(getOpaInstance) body := map[string]interface{}{"PolicyName": jsonString} jsonBody, _ := json.Marshal(body) req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody)) @@ -765,3 +864,4 @@ func TestWriteErrorJSONResponse_EncodingFailure(t *testing.T) { assert.Equal(t, http.StatusInternalServerError, response.StatusCode) } + diff --git a/pkg/kafkacomm/handler/pdp_message_handler.go b/pkg/kafkacomm/handler/pdp_message_handler.go index 8d1b9b4..235d56b 100644 --- a/pkg/kafkacomm/handler/pdp_message_handler.go +++ b/pkg/kafkacomm/handler/pdp_message_handler.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -132,13 +132,13 @@ func PdpMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic s switch opaPdpMessage.MessageType { case "PDP_UPDATE": - err = PdpUpdateMessageHandler(message, p) + err = pdpUpdateMessageHandler(message, p) if err != nil { log.Warnf("Error processing Update Message: %v", err) } case "PDP_STATE_CHANGE": - err = PdpStateChangeMessageHandler(message, p) + err = pdpStateChangeMessageHandler(message, p) if err != nil { log.Warnf("Error processing Update Message: %v", err) } diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler.go b/pkg/kafkacomm/handler/pdp_state_change_handler.go index 2de89ff..a2249d5 100644 --- a/pkg/kafkacomm/handler/pdp_state_change_handler.go +++ b/pkg/kafkacomm/handler/pdp_state_change_handler.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ import ( // Processes incoming messages indicating a PDP state change. // This includes updating the PDP state and sending a status response when the state transitions. -func PdpStateChangeMessageHandler(message []byte, p publisher.PdpStatusSender) error { +func pdpStateChangeMessageHandler(message []byte, p publisher.PdpStatusSender) error { var pdpStateChange model.PdpStateChange diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go index 5ad495d..8bddbca 100644 --- a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go +++ b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go @@ -95,7 +95,7 @@ func TestPdpStateChangeMessageHandler(t *testing.T) { } // Call the handler - err := PdpStateChangeMessageHandler(tt.message, mockSender) + err := pdpStateChangeMessageHandler(tt.message, mockSender) // Check the results if tt.expectError { diff --git a/pkg/kafkacomm/handler/pdp_update_deploy_policy.go b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go new file mode 100644 index 0000000..5c7651c --- /dev/null +++ b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go @@ -0,0 +1,390 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2025: Deutsche Telekom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 +// ========================LICENSE_END=================================== + +// will process the update message from pap and send the pdp status response. +package handler + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "os" + "os/exec" + "path/filepath" + "policy-opa-pdp/consts" + "policy-opa-pdp/pkg/kafkacomm/publisher" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/metrics" + "policy-opa-pdp/pkg/model" + "policy-opa-pdp/pkg/opasdk" + "policy-opa-pdp/pkg/policymap" + "policy-opa-pdp/pkg/utils" + "strings" +) + +// stores policy and data files to directory. +func createAndStorePolicyData(policy model.ToscaPolicy) error { + + // Extract and decode policies + decodedPolicies, key, err := extractAndDecodePolicies(policy) + if err != nil { + log.Errorf("Failed to extract and decode policies for key : %v, %v", key, err) + return err + } + + err = createPolicyDirectories(decodedPolicies) + if err != nil { + log.Errorf("Failed to create policy directories: %v", err) + return err + } + + decodedData, key, err := extractAndDecodeData(policy) + if err != nil { + log.Errorf("Failed to extract and decode data: %v", err) + return err + } + + err = createDataDirectories(decodedData) + if err != nil { + log.Errorf("Failed to create data directories: %v", err) + return err + } + + return nil +} + +// Function to create directories and save policies +func createPolicyDirectories(decodedPolicies map[string]string) error { + + for key, decodedPolicy := range decodedPolicies { + policyDir := filepath.Join(basePolicyDir, filepath.Join(strings.Split(key, ".")...)) + + err := utils.CreateDirectory(policyDir) + if err != nil { + log.Errorf("Failed to create policy directory %s: %v", policyDir, err) + return err + } + + err = os.WriteFile(filepath.Join(policyDir, "policy.rego"), []byte(decodedPolicy), os.ModePerm) + if err != nil { + log.Errorf("Failed to save policy.rego for %s: %v", key, err) + return err + } + log.Infof("Policy file saved: %s", filepath.Join(policyDir, "policy.rego")) + } + + return nil +} + +// Function to create directories and save data +func createDataDirectories(decodedData map[string]string) error { + + for key, dataContent := range decodedData { + dataDir := filepath.Join(baseDataDir, filepath.Join(strings.Split(key, ".")...)) + + err := utils.CreateDirectory(dataDir) + if err != nil { + log.Errorf("Failed to create data directory %s: %v", dataDir, err) + return err + } + + err = os.WriteFile(filepath.Join(dataDir, "data.json"), []byte(dataContent), os.ModePerm) + if err != nil { + log.Errorf("Failed to save data.json for %s: %v", key, err) + return err + } + log.Infof("Data file saved: %s", filepath.Join(dataDir, "data.json")) + } + + return nil +} + +// Extract and decodes Policies from PDP_UPDATE message using Base64Decode +func extractAndDecodePolicies(policy model.ToscaPolicy) (map[string]string, []string, error) { + + decodedPolicies := make(map[string]string) + var keys []string + for key, encodedPolicy := range policy.Properties.Policy { + decodedPolicy, err := base64.StdEncoding.DecodeString(encodedPolicy) + if err != nil { + log.Errorf("Failed to decode policy for key: %v, %v", key, err) + + return nil, nil, err + } + + decodedPolicies[key] = string(decodedPolicy) + keys = append(keys, key) + log.Tracef("Decoded policy content: %s", decodedPolicy) + + // Validate package name + if err := validatePackageName(key, string(decodedPolicy)); err != nil { + + log.Errorf("Validation for Policy: %v failed, %v", key, err) + return nil, nil, err + } + + log.Debugf("Decoded policy content for key '%s': %s", key, decodedPolicy) + } + + return decodedPolicies, keys, nil +} + +// Validate the package name extracted from the decoded policy against the key +func validatePackageName(key, decodedPolicyContent string) error { + // Extract the package name from the first line of the decoded policy content + lines := strings.Split(decodedPolicyContent, "\n") + if len(lines) == 0 { + return fmt.Errorf("no content found in decoded policy for key '%s'", key) + } + + // Assume the first line contains the package declaration + packageLine := strings.TrimSpace(lines[0]) + if !strings.HasPrefix(packageLine, "package ") { + return fmt.Errorf("package declaration not found in policy content for key '%s'", key) + } + + // Extract the actual package name + packageName := strings.TrimSpace(strings.TrimPrefix(packageLine, "package ")) + + expectedPackageName := key + + // Compare the extracted package name with the expected package name + if packageName != expectedPackageName { + return fmt.Errorf("package name mismatch for key '%s': expected '%s' but got '%s'", key, expectedPackageName, packageName) + } + + return nil +} + +// Extract and decodes Data from PDP_UPDATE message using Base64Decode +func extractAndDecodeData(policy model.ToscaPolicy) (map[string]string, []string, error) { + + decodedData := make(map[string]string) + var keys []string + for key, encodedData := range policy.Properties.Data { + decodedContent, err := base64.StdEncoding.DecodeString(encodedData) + if err != nil { + log.Errorf("Failed to decode data for key: %v, %v", key, err) + return nil, nil, err + } + decodedData[key] = string(decodedContent) + keys = append(keys, key) + log.Tracef("Decoded data content: %s", decodedContent) + } + + return decodedData, keys, nil +} + +// Function to extract folder name based on policy +func getDirName(policy model.ToscaPolicy) []string { + // Split the policy name to identify the folder part (i.e., the first part before ".") + + var dirNames []string + + for key, _ := range policy.Properties.Data { + + dirNames = append(dirNames, strings.ReplaceAll(consts.Data+"/"+key, ".", "/")) + + } + for key, _ := range policy.Properties.Policy { + + dirNames = append(dirNames, strings.ReplaceAll(consts.Policies+"/"+key, ".", "/")) + + } + + return dirNames +} + +// upsert policy to sdk. +func upsertPolicy(policy model.ToscaPolicy) error { + decodedContent, keys, _ := extractAndDecodePolicies(policy) + for _, key := range keys { + policyContent := decodedContent[key] + err := opasdk.UpsertPolicy(context.Background(), key, []byte(policyContent)) + if err != nil { + log.Errorf("Failed to Insert Policy %v", err) + return err + } + } + + return nil +} + +// handles writing data to sdk. +func upsertData(policy model.ToscaPolicy) error { + decodedDataContent, dataKeys, _ := extractAndDecodeData(policy) + for _, dataKey := range dataKeys { + dataContent := decodedDataContent[dataKey] + reader := bytes.NewReader([]byte(dataContent)) + decoder := json.NewDecoder(reader) + decoder.UseNumber() + + var wdata map[string]interface{} + err := decoder.Decode(&wdata) + if err != nil { + log.Errorf("Failed to Insert Data: %s: %v", policy.Name, err) + return err + + } + keypath := "/" + strings.Replace(dataKey, ".", "/", -1) + err = opasdk.WriteData(context.Background(), keypath, wdata) + if err != nil { + log.Errorf("Failed to Write Data: %s: %v", policy.Name, err) + return err + + } + + } + return nil +} + +// handles policy deployment +func handlePolicyDeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string) { + var failureMessages []string + successPolicies := make(map[string]string) + + // Check if policy is deployed previously + pdpUpdate.PoliciesToBeDeployed = checkIfPolicyAlreadyDeployed(pdpUpdate) + + for _, policy := range pdpUpdate.PoliciesToBeDeployed { + // Validate the policy + + policyAllowed, err := validateParentPolicy(policy) + if err != nil { + log.Warnf("Tosca Policy Id validation failed for policy nameas it is a parent folder:%s, %v", policy.Name, err) + failureMessages = append(failureMessages, fmt.Sprintf("%s, %v", policy.Name, err)) + metrics.IncrementDeployFailureCount() + metrics.IncrementTotalErrorCount() + continue + } + if policyAllowed { + log.Debugf("Policy Is Allowed: %s", policy.Name) + } + + if err := utils.ValidateToscaPolicyJsonFields(policy); err != nil { + log.Debugf("Tosca Policy Validation Failed for policy Name: %s, %v", policy.Name, err) + failureMessages = append(failureMessages, fmt.Sprintf("Tosca Policy Validation failed for Policy: %s: %v", policy.Name, err)) + metrics.IncrementDeployFailureCount() + metrics.IncrementTotalErrorCount() + continue + } + + // Create and store policy data + if err := createAndStorePolicyData(policy); err != nil { + failureMessages = append(failureMessages, fmt.Sprintf("%s: %v", policy.Name, err)) + metrics.IncrementDeployFailureCount() + metrics.IncrementTotalErrorCount() + continue + } + + // Build the bundle + if err := verifyPolicyByBundleCreation(policy); err != nil { + failureMessages = append(failureMessages, fmt.Sprintf("Failed to build Rego File for %s: %v", policy.Name, err)) + metrics.IncrementDeployFailureCount() + metrics.IncrementTotalErrorCount() + continue + } + + // Upsert policy and data + if err := upsertPolicyAndData(policy, successPolicies); err != nil { + failureMessages = append(failureMessages, err.Error()) + metrics.IncrementDeployFailureCount() + metrics.IncrementTotalErrorCount() + continue + } else { + successPolicies[policy.Name] = policy.Version + if _, err := policymap.UpdateDeployedPoliciesinMap(policy); err != nil { + log.Warnf("Failed to store policy data map after deploying policy %s: %v", policy.Name, err) + } + } + metrics.IncrementDeploySuccessCount() + log.Debugf("Loaded Policy: %s", policy.Name) + + } + + totalPolicies := policymap.GetTotalDeployedPoliciesCountFromMap() + metrics.SetTotalPoliciesCount(int64(totalPolicies)) + + return failureMessages, successPolicies +} + +// checks if policy exists in the map. +func checkIfPolicyAlreadyDeployed(pdpUpdate model.PdpUpdate) []model.ToscaPolicy { + if len(policymap.LastDeployedPolicies) > 0 { + log.Debugf("Check if Policy is Already Deployed: %v", policymap.LastDeployedPolicies) + return policymap.VerifyAndReturnPoliciesToBeDeployed(policymap.LastDeployedPolicies, pdpUpdate) + } + return pdpUpdate.PoliciesToBeDeployed +} + +// verfies policy by creating bundle. +func verifyPolicyByBundleCreation(policy model.ToscaPolicy) error { + // get directory name + dirNames := getDirName(policy) + if len(dirNames) == 0 { + log.Warnf("Unable to extract folder name from policy %s", policy.Name) + return fmt.Errorf("failed to extract folder name") + } + // create bundle + output, err := createBundleFunc(exec.Command, policy) + if err != nil { + log.Warnf("Failed to initialize bundle for %s: %s", policy.Name, string(output)) + for _, dirPath := range dirNames { + if removeErr := utils.RemoveDirectory(dirPath); removeErr != nil { + log.Errorf("Error removing directory for policy %s: %v", policy.Name, removeErr) + } + } + log.Debugf("Directory cleanup as bundle creation failed") + return fmt.Errorf("failed to build bundle: %v", err) + } + return nil +} + +// handles Upsert func for policy and data +func upsertPolicyAndData(policy model.ToscaPolicy, successPolicies map[string]string) error { + if err := upsertPolicy(policy); err != nil { + log.Warnf("Failed to upsert policy: %v", err) + return fmt.Errorf("Failed to Insert Policy: %s: %v", policy.Name, err) + } + + if err := upsertData(policy); err != nil { + return fmt.Errorf("Failed to Write Data: %s: %v", policy.Name, err) + } + + return nil +} + +// validates whether new policy is parent of the existing policy +func validateParentPolicy(policy model.ToscaPolicy) (bool, error) { + policiesmap, err := policymap.UnmarshalLastDeployedPolicies(policymap.LastDeployedPolicies) + if err != nil { + log.Warnf("Failed to extract deployed policies: %v", err) + return false, err + } + + policyAllowed, err := utils.IsPolicyNameAllowed(policy, policiesmap) + + if err != nil { + log.Warnf("Tosca Policy Id validation failed for policy nameas it is a parent folder:%s, %v", policy.Name, err) + return false, err + } + return policyAllowed, nil + +} diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler.go b/pkg/kafkacomm/handler/pdp_update_message_handler.go index efe115c..148e5b5 100644 --- a/pkg/kafkacomm/handler/pdp_update_message_handler.go +++ b/pkg/kafkacomm/handler/pdp_update_message_handler.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,30 +21,49 @@ package handler import ( "encoding/json" - "github.com/go-playground/validator/v10" + "fmt" + "os/exec" + "policy-opa-pdp/consts" + "policy-opa-pdp/pkg/bundleserver" "policy-opa-pdp/pkg/kafkacomm/publisher" "policy-opa-pdp/pkg/log" "policy-opa-pdp/pkg/model" "policy-opa-pdp/pkg/pdpattributes" + "policy-opa-pdp/pkg/policymap" + "policy-opa-pdp/pkg/utils" + "strings" +) + +var ( + basePolicyDir = consts.Policies + baseDataDir = consts.Data ) // Handles messages of type PDP_UPDATE sent from the Policy Administration Point (PAP). // It validates the incoming data, updates PDP attributes, and sends a response back to the sender. -func PdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error { +func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error { + var failureMessages []string var pdpUpdate model.PdpUpdate + var loggingPoliciesList string err := json.Unmarshal(message, &pdpUpdate) if err != nil { log.Debugf("Failed to UnMarshal Messages: %v\n", err) + resMessage := fmt.Errorf("PDP Update Failed: %v", err) + if err := sendFailureResponse(p, &pdpUpdate, resMessage); err != nil { + log.Debugf("Failed to send update error response: %v", err) + return err + } return err } - //Initialize Validator and validate Struct after unmarshalling - validate := validator.New() - err = validate.Struct(pdpUpdate) + //Initialize Validator and validate Struct after unmarshalling + err = utils.ValidateFieldsStructs(pdpUpdate) if err != nil { - for _, err := range err.(validator.ValidationErrors) { - log.Infof("Field %s failed on the %s tag\n", err.Field(), err.Tag()) + resMessage := fmt.Errorf("PDP Update Failed: %v", err) + if err := sendFailureResponse(p, &pdpUpdate, resMessage); err != nil { + log.Debugf("Failed to send update error response: %v", err) + return err } return err } @@ -54,12 +73,112 @@ func PdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error pdpattributes.SetPdpSubgroup(pdpUpdate.PdpSubgroup) pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs) - err = publisher.SendPdpUpdateResponse(p, &pdpUpdate) - if err != nil { - log.Debugf("Failed to Send Update Response Message: %v\n", err) - return err + if len(pdpUpdate.PoliciesToBeDeployed) > 0 { + failureMessage, successfullyDeployedPolicies := handlePolicyDeployment(pdpUpdate, p) + mapJson, err := policymap.FormatMapofAnyType(successfullyDeployedPolicies) + if len(failureMessage) > 0 { + failureMessages = append(failureMessages, "{Deployment Errors:"+strings.Join(failureMessage, "")+"}") + } + if err != nil { + failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|") + resMessage := fmt.Errorf("PDP Update Failed: failed to format successfullyDeployedPolicies json %v", failureMessages) + if err = sendFailureResponse(p, &pdpUpdate, resMessage); err != nil { + log.Debugf("Failed to send update error response: %v", err) + return err + } + } + loggingPoliciesList = mapJson + } + + // Check if "PoliciesToBeUndeployed" is empty or not + if len(pdpUpdate.PoliciesToBeUndeployed) > 0 { + log.Infof("Found Policies to be undeployed") + failureMessage, successfullyUndeployedPolicies := handlePolicyUndeployment(pdpUpdate, p) + mapJson, err := policymap.FormatMapofAnyType(successfullyUndeployedPolicies) + if len(failureMessage) > 0 { + failureMessages = append(failureMessages, "{UnDeployment Errors:"+strings.Join(failureMessage, "")+"}") + } + if err != nil { + failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|") + resMessage := fmt.Errorf("PDP Update Failed: failed to format successfullyUnDeployedPolicies json %v", failureMessages) + if err = sendFailureResponse(p, &pdpUpdate, resMessage); err != nil { + log.Debugf("Failed to send update error response: %v", err) + return err + } + } + loggingPoliciesList = mapJson + } + + if len(pdpUpdate.PoliciesToBeDeployed) == 0 && len(pdpUpdate.PoliciesToBeUndeployed) == 0 { + //Response for PAP Registration + err = sendSuccessResponse(p, &pdpUpdate, "PDP UPDATE is successfull") + if err != nil { + log.Debugf("Failed to Send Update Response Message: %v\n", err) + return err + } + } else { + //Send Response for Deployment or Undeployment or when both deployment and undeployment comes together + if err := sendPDPStatusResponse(pdpUpdate, p, loggingPoliciesList, failureMessages); err != nil { + return err + } } log.Infof("PDP_STATUS Message Sent Successfully") go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p) return nil } + +// build bundle tar file +func createBundleFunc(execCmd func(string, ...string) *exec.Cmd, toscaPolicy model.ToscaPolicy) (string, error) { + return bundleserver.BuildBundle(execCmd) +} + +func sendSuccessResponse(p publisher.PdpStatusSender, pdpUpdate *model.PdpUpdate, respMessage string) error { + if err := publisher.SendPdpUpdateResponse(p, pdpUpdate, respMessage); err != nil { + return err + } + return nil +} + +func sendFailureResponse(p publisher.PdpStatusSender, pdpUpdate *model.PdpUpdate, respMessage error) error { + if err := publisher.SendPdpUpdateErrorResponse(p, pdpUpdate, respMessage); err != nil { + return err + } + return nil +} + +func sendPDPStatusResponse(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender, loggingPoliciesList string, failureMessages []string) error { + if len(failureMessages) > 0 { + resMessage := fmt.Errorf("PDP Update Failed: %v", failureMessages) + if err := sendFailureResponse(p, &pdpUpdate, resMessage); err != nil { + log.Warnf("Failed to send update error response: %v", err) + return err + } + } else { + + if len(pdpUpdate.PoliciesToBeUndeployed) == 0 { + resMessage := fmt.Sprintf("PDP Update Successful for all policies: %v", loggingPoliciesList) + if err := sendSuccessResponse(p, &pdpUpdate, resMessage); err != nil { + log.Warnf("Failed to send update response: %v", err) + return err + } + log.Infof("Processed policies_to_be_deployed successfully") + } else if len(pdpUpdate.PoliciesToBeDeployed) == 0 { + + resMessage := fmt.Sprintf("PDP Update Policies undeployed :%v", loggingPoliciesList) + + if err := sendSuccessResponse(p, &pdpUpdate, resMessage); err != nil { + log.Warnf("Failed to Send Update Response Message: %v", err) + return err + } + log.Infof("Processed policies_to_be_undeployed successfully") + } else { + + if err := sendSuccessResponse(p, &pdpUpdate, "PDP UPDATE is successfull"); err != nil { + log.Warnf("Failed to Send Update Response Message: %v", err) + return err + } + } + + } + return nil +} diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go index 4d5d7dc..d29c814 100644 --- a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go +++ b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go @@ -1,5 +1,5 @@ // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,7 +22,9 @@ import ( "errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "policy-opa-pdp/consts" "policy-opa-pdp/pkg/kafkacomm/publisher/mocks" + "policy-opa-pdp/pkg/policymap" "testing" ) @@ -50,7 +52,7 @@ func TestPdpUpdateMessageHandler_Success(t *testing.T) { mockSender := new(mocks.PdpStatusSender) mockSender.On("SendPdpStatus", mock.Anything).Return(nil) - err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + err := pdpUpdateMessageHandler([]byte(messageString), mockSender) assert.NoError(t, err) } @@ -70,7 +72,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure1(t *testing.T) { mockSender := new(mocks.PdpStatusSender) mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error")) - err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + err := pdpUpdateMessageHandler([]byte(messageString), mockSender) assert.Error(t, err) } @@ -91,7 +93,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure2(t *testing.T) { mockSender := new(mocks.PdpStatusSender) mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error")) - err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + err := pdpUpdateMessageHandler([]byte(messageString), mockSender) assert.Error(t, err) } @@ -112,7 +114,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure3(t *testing.T) { mockSender := new(mocks.PdpStatusSender) mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error")) - err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + err := pdpUpdateMessageHandler([]byte(messageString), mockSender) assert.Error(t, err) } @@ -131,7 +133,7 @@ func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure4(t *testing.T) { mockSender := new(mocks.PdpStatusSender) mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error")) - err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + err := pdpUpdateMessageHandler([]byte(messageString), mockSender) assert.Error(t, err) } @@ -160,7 +162,7 @@ func TestPdpUpdateMessageHandler_Fails_Sending_UpdateResponse(t *testing.T) { mockSender := new(mocks.PdpStatusSender) mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Error in Sending PDP Update Response")) - err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + err := pdpUpdateMessageHandler([]byte(messageString), mockSender) assert.Error(t, err) } @@ -190,7 +192,63 @@ func TestPdpUpdateMessageHandler_Invalid_Starttimeinterval(t *testing.T) { mockSender := new(mocks.PdpStatusSender) mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Invalid Interval Time for Heartbeat")) - err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + err := pdpUpdateMessageHandler([]byte(messageString), mockSender) assert.Error(t, err) } + +/* +PdpUpdateMessageHandler_Successful_Deployment +Description: Test by sending a valid input with policies to be deployed +Input: valid input with PoliciesToBeDeployed +Expected Output: Policies should be deployed successfully and corresponding messages sent. +*/ +func TestPdpUpdateMessageHandler_Successful_Deployment(t *testing.T) { + messageString := `{ + "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0", + "pdpHeartbeatIntervalMs":120000, + "policiesToBeDeployed": [{"type": "onap.policies.native.opa","type_version": "1.0.0","properties": {"data": {"zone": "ewogICJ6b25lIjogewogICAgInpvbmVfYWNjZXNzX2xvZ3MiOiBbCiAgICAgIHsgImxvZ19pZCI6ICJsb2cxIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDA5OjAwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJncmFudGVkIiwgInVzZXIiOiAidXNlcjEiIH0sCiAgICAgIHsgImxvZ19pZCI6ICJsb2cyIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDEwOjMwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJkZW5pZWQiLCAidXNlciI6ICJ1c2VyMiIgfSwKICAgICAgeyAibG9nX2lkIjogImxvZzMiLCAidGltZXN0YW1wIjogIjIwMjQtMTEtMDFUMTE6MDA6MDBaIiwgInpvbmVfaWQiOiAiem9uZUIiLCAiYWNjZXNzIjogImdyYW50ZWQiLCAidXNlciI6ICJ1c2VyMyIgfQogICAgXQogIH0KfQo="},"policy": {"zone": "cGFja2FnZSB6b25lCgppbXBvcnQgcmVnby52MQoKZGVmYXVsdCBhbGxvdyA6PSBmYWxzZQoKYWxsb3cgaWYgewogICAgaGFzX3pvbmVfYWNjZXNzCiAgICBhY3Rpb25faXNfbG9nX3ZpZXcKfQoKYWN0aW9uX2lzX2xvZ192aWV3IGlmIHsKICAgICJ2aWV3IiBpbiBpbnB1dC5hY3Rpb25zCn0KCmhhc196b25lX2FjY2VzcyBjb250YWlucyBhY2Nlc3NfZGF0YSBpZiB7CiAgICBzb21lIHpvbmVfZGF0YSBpbiBkYXRhLnpvbmUuem9uZS56b25lX2FjY2Vzc19sb2dzCiAgICB6b25lX2RhdGEudGltZXN0YW1wID49IGlucHV0LnRpbWVfcGVyaW9kLmZyb20KICAgIHpvbmVfZGF0YS50aW1lc3RhbXAgPCBpbnB1dC50aW1lX3BlcmlvZC50bwogICAgem9uZV9kYXRhLnpvbmVfaWQgPT0gaW5wdXQuem9uZV9pZAogICAgYWNjZXNzX2RhdGEgOj0ge2RhdGF0eXBlOiB6b25lX2RhdGFbZGF0YXR5cGVdIHwgZGF0YXR5cGUgaW4gaW5wdXQuZGF0YXR5cGVzfQp9Cg=="}},"name": "zone","version": "1.0.0","metadata": {"policy-id": "zone","policy-version": "1.0.0"}}], + "policiesToBeUndeployed":[], + "messageName":"PDP_UPDATE", + "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1", + "timestampMs":1730722305297, + "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059", + "pdpGroup":"opaGroup", + "pdpSubgroup":"opa" + }` + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(nil) + + consts.Data ="/tmp/data" + consts.Policies ="/tmp/policies" + err := pdpUpdateMessageHandler([]byte(messageString), mockSender) + assert.NoError(t, err) +} + + +/* +PdpUpdateMessageHandler_Skipping_Deployment +Description: Test by sending a valid input with policies to be deployed where the policy is already deployed +Input: valid input with PoliciesToBeDeployed +Expected Output: Policies should be skipping deployment since it is already present. +*/ +func TestPdpUpdateMessageHandler_Skipping_Deployment(t *testing.T) { + messageString := `{ + "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0", + "pdpHeartbeatIntervalMs":120000, + "policiesToBeDeployed": [{"type": "onap.policies.native.opa","type_version": "1.0.0","properties": {"data": {"zone": "ewogICJ6b25lIjogewogICAgInpvbmVfYWNjZXNzX2xvZ3MiOiBbCiAgICAgIHsgImxvZ19pZCI6ICJsb2cxIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDA5OjAwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJncmFudGVkIiwgInVzZXIiOiAidXNlcjEiIH0sCiAgICAgIHsgImxvZ19pZCI6ICJsb2cyIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDEwOjMwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJkZW5pZWQiLCAidXNlciI6ICJ1c2VyMiIgfSwKICAgICAgeyAibG9nX2lkIjogImxvZzMiLCAidGltZXN0YW1wIjogIjIwMjQtMTEtMDFUMTE6MDA6MDBaIiwgInpvbmVfaWQiOiAiem9uZUIiLCAiYWNjZXNzIjogImdyYW50ZWQiLCAidXNlciI6ICJ1c2VyMyIgfQogICAgXQogIH0KfQo="},"policy": {"zone": "cGFja2FnZSB6b25lCgppbXBvcnQgcmVnby52MQoKZGVmYXVsdCBhbGxvdyA6PSBmYWxzZQoKYWxsb3cgaWYgewogICAgaGFzX3pvbmVfYWNjZXNzCiAgICBhY3Rpb25faXNfbG9nX3ZpZXcKfQoKYWN0aW9uX2lzX2xvZ192aWV3IGlmIHsKICAgICJ2aWV3IiBpbiBpbnB1dC5hY3Rpb25zCn0KCmhhc196b25lX2FjY2VzcyBjb250YWlucyBhY2Nlc3NfZGF0YSBpZiB7CiAgICBzb21lIHpvbmVfZGF0YSBpbiBkYXRhLnpvbmUuem9uZS56b25lX2FjY2Vzc19sb2dzCiAgICB6b25lX2RhdGEudGltZXN0YW1wID49IGlucHV0LnRpbWVfcGVyaW9kLmZyb20KICAgIHpvbmVfZGF0YS50aW1lc3RhbXAgPCBpbnB1dC50aW1lX3BlcmlvZC50bwogICAgem9uZV9kYXRhLnpvbmVfaWQgPT0gaW5wdXQuem9uZV9pZAogICAgYWNjZXNzX2RhdGEgOj0ge2RhdGF0eXBlOiB6b25lX2RhdGFbZGF0YXR5cGVdIHwgZGF0YXR5cGUgaW4gaW5wdXQuZGF0YXR5cGVzfQp9Cg=="}},"name": "zone","version": "1.0.0","metadata": {"policy-id": "zone","policy-version": "1.0.0"}}], + "policiesToBeUndeployed":[], + "messageName":"PDP_UPDATE", + "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1", + "timestampMs":1730722305297, + "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059", + "pdpGroup":"opaGroup", + "pdpSubgroup":"opa" + }` + + policymap.LastDeployedPolicies = `{"deployed_policies_dict": [{"data": ["zone"],"policy": ["zone"],"policy-id": "zone","policy-version": "1.0.0"}]}` + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(nil) + err := pdpUpdateMessageHandler([]byte(messageString), mockSender) + assert.NoError(t, err) +} diff --git a/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go b/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go new file mode 100644 index 0000000..9770d88 --- /dev/null +++ b/pkg/kafkacomm/handler/pdp_update_undeploy_policy.go @@ -0,0 +1,196 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2025: Deutsche Telekom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 +// ========================LICENSE_END=================================== + +// will process the update message from pap and send the pdp status response. +package handler + +import ( + "context" + "fmt" + "path/filepath" + "policy-opa-pdp/consts" + "policy-opa-pdp/pkg/kafkacomm/publisher" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/metrics" + "policy-opa-pdp/pkg/model" + "policy-opa-pdp/pkg/opasdk" + "policy-opa-pdp/pkg/policymap" + "policy-opa-pdp/pkg/utils" + "strings" +) + +// processPoliciesTobeUndeployed handles the undeployment of policies +func processPoliciesTobeUndeployed(undeployedPolicies map[string]string) ([]string, map[string]string) { + var failureMessages []string + + successfullyUndeployedPolicies := make(map[string]string) + + // Unmarshal the last known policies + deployedPolicies, err := policymap.UnmarshalLastDeployedPolicies(policymap.LastDeployedPolicies) + if err != nil { + log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err) + } + + for policyID, policyVersion := range undeployedPolicies { + // Check if undeployed policy exists in deployedPolicies + matchedPolicy := findDeployedPolicy(policyID, policyVersion, deployedPolicies) + if matchedPolicy != nil { + // Handle undeployment for the policy + errs := policyUndeploymentAction(matchedPolicy) + if len(errs) > 0 { + metrics.IncrementUndeployFailureCount() + metrics.IncrementTotalErrorCount() + failureMessages = append(failureMessages, errs...) + } + deployedPoliciesMap, err := policymap.RemoveUndeployedPoliciesfromMap(matchedPolicy) + if err != nil { + log.Warnf("Policy Name: %s, Version: %s is not removed from LastDeployedPolicies", policyID, policyVersion) + failureMessages = append(failureMessages, "Error in removing from LastDeployedPolicies") + } + log.Debugf("Policies Map After Undeployment : %s", deployedPoliciesMap) + metrics.IncrementUndeploySuccessCount() + successfullyUndeployedPolicies[policyID] = policyVersion + } else { + // Log failure if no match is found + log.Debugf("Policy Name: %s, Version: %s is marked for undeployment but was not deployed", policyID, policyVersion) + continue + } + } + + totalPolicies := policymap.GetTotalDeployedPoliciesCountFromMap() + metrics.SetTotalPoliciesCount(int64(totalPolicies)) + + return failureMessages, successfullyUndeployedPolicies +} + +func handlePolicyUndeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatusSender) ([]string, map[string]string) { + + // Extract undeployed policies into a dictionary + undeployedPoliciesDict := extractUndeployedPolicies(pdpUpdate.PoliciesToBeUndeployed) + + // Process undeployment actions + errorMessages, successfullyUndeployedPolicies := processPoliciesTobeUndeployed(undeployedPoliciesDict) + + return errorMessages, successfullyUndeployedPolicies + +} + +// ExtractUndeployedPolicies extracts policy names and versions into a map +func extractUndeployedPolicies(policies []model.ToscaConceptIdentifier) map[string]string { + undeployedPoliciesDict := make(map[string]string) + for _, policy := range policies { + undeployedPoliciesDict[policy.Name] = policy.Version + log.Infof("Extracted Policy Name: %s, Version: %s for undeployment", policy.Name, policy.Version) + } + return undeployedPoliciesDict +} + +// HandlePolicyUndeployment processes the actual undeployment actions for a policy +func policyUndeploymentAction(policy map[string]interface{}) []string { + var failureMessages []string + + // Delete "policy" sdk and directories + policyErrors := removePolicyFromSdkandDir(policy) + failureMessages = append(failureMessages, policyErrors...) + + // Delete "data" sdk and directories + dataErrors := removeDataFromSdkandDir(policy) + failureMessages = append(failureMessages, dataErrors...) + + return failureMessages +} + +// removeDataFromSdkandDir handles the "data" directories in the policy +func removeDataFromSdkandDir(policy map[string]interface{}) []string { + var failureMessages []string + + if dataKeys, ok := policy["data"].([]interface{}); ok { + for _, dataKey := range dataKeys { + keyPath := "/" + strings.Replace(dataKey.(string), ".", "/", -1) + log.Debugf("Deleting data from OPA at keypath: %s", keyPath) + if err := opasdk.DeleteData(context.Background(), keyPath); err != nil { + failureMessages = append(failureMessages, err.Error()) + continue + } + if err := removeDataDirectory(keyPath); err != nil { + failureMessages = append(failureMessages, err.Error()) + } + } + } else { + failureMessages = append(failureMessages, fmt.Sprintf("%s:%s Invalid JSON structure: 'data' is missing or not an array", policy["policy-id"], policy["policy-version"])) + } + + return failureMessages +} + +// removePolicyFromSdkandDir handles the "policy" directories in the policy +func removePolicyFromSdkandDir(policy map[string]interface{}) []string { + var failureMessages []string + + if policyKeys, ok := policy["policy"].([]interface{}); ok { + for _, policyKey := range policyKeys { + keyPath := "/" + strings.Replace(policyKey.(string), ".", "/", -1) + if err := opasdk.DeletePolicy(context.Background(), policyKey.(string)); err != nil { + failureMessages = append(failureMessages, err.Error()) + continue + } + if err := removePolicyDirectory(keyPath); err != nil { + failureMessages = append(failureMessages, err.Error()) + } + } + } else { + failureMessages = append(failureMessages, fmt.Sprintf("%s:%s Invalid JSON structure: 'policy' is missing or not an array", policy["policy-id"], policy["policy-version"])) + } + + return failureMessages +} + +// RemoveDataDirectory removes a directory for data +func removeDataDirectory(dataKey string) error { + dataPath := filepath.Join(consts.Data, dataKey) + log.Debugf("Removing data directory: %s", dataPath) + if err := utils.RemoveDirectory(dataPath); err != nil { + return fmt.Errorf("Failed to handle directory for data %s: %v", dataPath, err) + } + return nil +} + +// RemovePolicyDirectory removes a directory for policies +func removePolicyDirectory(policyKey string) error { + policyPath := filepath.Join(consts.Policies, policyKey) + log.Debugf("Removing policy directory: %s", policyPath) + if err := utils.RemoveDirectory(policyPath); err != nil { + return fmt.Errorf("Failed to handle directory for policy %s: %v", policyPath, err) + } + return nil +} + +// findDeployedPolicy searches for a policy in deployedPolicies +func findDeployedPolicy(policyID, policyVersion string, deployedPolicies []map[string]interface{}) map[string]interface{} { + for _, policy := range deployedPolicies { + // Extract policy-id and policy-version from the deployed policy + id, idOk := policy["policy-id"].(string) + version, versionOk := policy["policy-version"].(string) + + // Check if the deployed policy matches the undeployed policy + if idOk && versionOk && id == policyID && version == policyVersion { + return policy + } + } + return nil +} diff --git a/pkg/kafkacomm/pdp_topic_consumer.go b/pkg/kafkacomm/pdp_topic_consumer.go index 3d19e6c..ce7fc4c 100644 --- a/pkg/kafkacomm/pdp_topic_consumer.go +++ b/pkg/kafkacomm/pdp_topic_consumer.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -93,6 +93,9 @@ func NewKafkaConsumer() (*KafkaConsumer, error) { configMap.SetKey("sasl.username", username) configMap.SetKey("sasl.password", password) configMap.SetKey("security.protocol", "SASL_PLAINTEXT") + configMap.SetKey("fetch.max.bytes", 50*1024*1024) + configMap.SetKey("max.partition.fetch.bytes",50*1024*1024) + configMap.SetKey("socket.receive.buffer.bytes", 50*1024*1024) configMap.SetKey("session.timeout.ms", "30000") configMap.SetKey("max.poll.interval.ms", "300000") configMap.SetKey("enable.partition.eof", true) diff --git a/pkg/kafkacomm/pdp_topic_consumer_test.go b/pkg/kafkacomm/pdp_topic_consumer_test.go index 1518474..eb2db03 100644 --- a/pkg/kafkacomm/pdp_topic_consumer_test.go +++ b/pkg/kafkacomm/pdp_topic_consumer_test.go @@ -20,16 +20,16 @@ package kafkacomm import ( + "bou.ke/monkey" "errors" - "policy-opa-pdp/pkg/kafkacomm/mocks" - "testing" - "sync" - "fmt" + "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "policy-opa-pdp/cfg" - "bou.ke/monkey" + "policy-opa-pdp/pkg/kafkacomm/mocks" + "sync" + "testing" ) var kafkaConsumerFactory = kafka.NewConsumer @@ -175,34 +175,34 @@ func TestKafkaConsumer_Unsubscribe_Nil_Error(t *testing.T) { } -//Helper function to reset +// Helper function to reset func resetKafkaConsumerSingleton() { - consumerOnce = sync.Once{} - consumerInstance = nil + consumerOnce = sync.Once{} + consumerInstance = nil } -//Test for mock error creating consumers +// Test for mock error creating consumers func TestNewKafkaConsumer_ErrorCreatingConsumer(t *testing.T) { - resetKafkaConsumerSingleton() - monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) { - return nil, fmt.Errorf("mock error creating consumer") - }) - defer monkey.Unpatch(kafka.NewConsumer) - - consumer, err := NewKafkaConsumer() - assert.Nil(t, consumer) - assert.EqualError(t, err, "Kafka Consumer instance not created") + resetKafkaConsumerSingleton() + monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) { + return nil, fmt.Errorf("mock error creating consumer") + }) + defer monkey.Unpatch(kafka.NewConsumer) + + consumer, err := NewKafkaConsumer() + assert.Nil(t, consumer) + assert.EqualError(t, err, "Kafka Consumer instance not created") } // Test for error creating kafka instance func TestNewKafkaConsumer_NilConsumer(t *testing.T) { - resetKafkaConsumerSingleton() - monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) { - return nil, nil - }) - defer monkey.Unpatch(kafka.NewConsumer) - - consumer, err := NewKafkaConsumer() - assert.Nil(t, consumer) - assert.EqualError(t, err, "Kafka Consumer instance not created") + resetKafkaConsumerSingleton() + monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) { + return nil, nil + }) + defer monkey.Unpatch(kafka.NewConsumer) + + consumer, err := NewKafkaConsumer() + assert.Nil(t, consumer) + assert.EqualError(t, err, "Kafka Consumer instance not created") } diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat.go b/pkg/kafkacomm/publisher/pdp-heartbeat.go index 42a5e70..0f68840 100644 --- a/pkg/kafkacomm/publisher/pdp-heartbeat.go +++ b/pkg/kafkacomm/publisher/pdp-heartbeat.go @@ -30,6 +30,7 @@ import ( "policy-opa-pdp/pkg/model" "policy-opa-pdp/pkg/pdpattributes" "policy-opa-pdp/pkg/pdpstate" + "policy-opa-pdp/pkg/policymap" "sync" "time" ) @@ -43,13 +44,17 @@ var ( // Initializes a timer that sends periodic heartbeat messages to indicate the health and state of the PDP. func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) { - if intervalMs <= 0 { + mu.Lock() + defer mu.Unlock() + + if intervalMs < 0 { log.Errorf("Invalid interval provided: %d. Interval must be greater than zero.", intervalMs) ticker = nil return + } else if intervalMs == 0 { + intervalMs = currentInterval + } - mu.Lock() - defer mu.Unlock() if ticker != nil && intervalMs == currentInterval { log.Debug("Ticker is already running") @@ -59,7 +64,7 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) { if ticker != nil { ticker.Stop() } - // StopTicker() + currentInterval = intervalMs ticker = time.NewTicker(time.Duration(intervalMs) * time.Millisecond) @@ -89,12 +94,23 @@ func sendPDPHeartBeat(s PdpStatusSender) error { Healthy: model.Healthy, Name: pdpattributes.PdpName, Description: "Pdp heartbeat", + Policies: []model.ToscaConceptIdentifier{}, PdpGroup: consts.PdpGroup, PdpSubgroup: &pdpattributes.PdpSubgroup, } pdpStatus.RequestID = uuid.New().String() pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli()) + policiesMap := policymap.LastDeployedPolicies + + if policiesMap != "" { + if (policymap.ExtractDeployedPolicies(policiesMap)) == nil { + log.Warnf("No Policies extracted from Policy Map") + } else { + pdpStatus.Policies = policymap.ExtractDeployedPolicies(policiesMap) + } + } + err := s.SendPdpStatus(pdpStatus) log.Debugf("Sending Heartbeat ...") if err != nil { diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go index ba72c77..c3676c8 100644 --- a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go +++ b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go @@ -25,8 +25,7 @@ import ( "github.com/stretchr/testify/mock" "policy-opa-pdp/pkg/kafkacomm/publisher/mocks" "testing" - ) - +) /* Success Case 1 diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration.go b/pkg/kafkacomm/publisher/pdp-pap-registration.go index 5525f61..c726b2b 100644 --- a/pkg/kafkacomm/publisher/pdp-pap-registration.go +++ b/pkg/kafkacomm/publisher/pdp-pap-registration.go @@ -54,7 +54,6 @@ func (s *RealPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error { log.Warnf("failed to marshal PdpStatus to JSON: %v", err) return err } - log.Debugf("Producer saved in RealPdp StatusSender") kafkaMessage := &kafka.Message{ TopicPartition: kafka.TopicPartition{ diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go index 84013cb..8618eeb 100644 --- a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go +++ b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go @@ -22,14 +22,14 @@ package publisher import ( "errors" "fmt" + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "time" - "github.com/google/uuid" "policy-opa-pdp/pkg/kafkacomm/publisher/mocks" - "github.com/confluentinc/confluent-kafka-go/kafka" "policy-opa-pdp/pkg/model" "testing" + "time" ) type MockPdpStatusSender struct { @@ -140,4 +140,3 @@ func TestSendPdpStatus_Failure(t *testing.T) { // Verify that the Produce method was called exactly once mockProducer.AssertExpectations(t) } - diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher.go b/pkg/kafkacomm/publisher/pdp-status-publisher.go index 4a13b1c..3e25780 100644 --- a/pkg/kafkacomm/publisher/pdp-status-publisher.go +++ b/pkg/kafkacomm/publisher/pdp-status-publisher.go @@ -29,6 +29,7 @@ import ( "policy-opa-pdp/pkg/model" "policy-opa-pdp/pkg/pdpattributes" "policy-opa-pdp/pkg/pdpstate" + "policy-opa-pdp/pkg/policymap" "time" "github.com/google/uuid" @@ -36,10 +37,10 @@ import ( // Sends a PDP_STATUS message to indicate the successful processing of a PDP_UPDATE request // received from the Policy Administration Point (PAP). -func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error { +func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate, resMessage string) error { responseStatus := model.Success - responseMessage := "PDP Update was Successful" + responseMessage := resMessage pdpStatus := model.PdpStatus{ MessageType: model.PDP_STATUS, @@ -50,7 +51,7 @@ func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error Description: "Pdp Status Response Message For Pdp Update", PdpGroup: consts.PdpGroup, PdpSubgroup: &pdpattributes.PdpSubgroup, - // Policies: [], + Policies: []model.ToscaConceptIdentifier{}, PdpResponse: &model.PdpResponseDetails{ ResponseTo: &pdpUpdate.RequestId, ResponseStatus: &responseStatus, @@ -61,6 +62,16 @@ func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error pdpStatus.RequestID = uuid.New().String() pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli()) + policiesMap := policymap.LastDeployedPolicies + + if policiesMap != "" { + if (policymap.ExtractDeployedPolicies(policiesMap)) == nil { + log.Warnf("No Policies extracted from Policy Map") + } else { + pdpStatus.Policies = policymap.ExtractDeployedPolicies(policiesMap) + } + } + log.Infof("Sending PDP Status With Update Response") err := s.SendPdpStatus(pdpStatus) @@ -73,6 +84,53 @@ func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error } +func SendPdpUpdateErrorResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate, err error) error { + + responseStatus := model.Failure + responseMessage := fmt.Sprintf("%v", err) + + pdpStatus := model.PdpStatus{ + MessageType: model.PDP_STATUS, + PdpType: consts.PdpType, + State: pdpstate.State, + Healthy: model.Healthy, + Name: pdpattributes.PdpName, + Description: "Pdp Status Response Message For Pdp Update", + PdpGroup: consts.PdpGroup, + PdpSubgroup: &pdpattributes.PdpSubgroup, + Policies: []model.ToscaConceptIdentifier{}, + PdpResponse: &model.PdpResponseDetails{ + ResponseTo: &pdpUpdate.RequestId, + ResponseStatus: &responseStatus, + ResponseMessage: &responseMessage, + }, + } + + pdpStatus.RequestID = uuid.New().String() + pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli()) + + policiesMap := policymap.LastDeployedPolicies + + if policiesMap != "" { + if (policymap.ExtractDeployedPolicies(policiesMap)) == nil { + log.Warnf("No Policies extracted from Policy Map") + } else { + pdpStatus.Policies = policymap.ExtractDeployedPolicies(policiesMap) + } + } + + log.Infof("Sending PDP Status With Update Error Response") + + err = s.SendPdpStatus(pdpStatus) + if err != nil { + log.Warnf("Failed to send PDP Update Message : %v", err) + return err + } + + return nil + +} + // Sends a PDP_STATUS message to indicate a state change in the PDP (e.g., from PASSIVE to ACTIVE). func SendStateChangeResponse(s PdpStatusSender, pdpStateChange *model.PdpStateChange) error { @@ -87,7 +145,7 @@ func SendStateChangeResponse(s PdpStatusSender, pdpStateChange *model.PdpStateCh Description: "Pdp Status Response Message to Pdp State Change", PdpGroup: consts.PdpGroup, PdpSubgroup: &pdpattributes.PdpSubgroup, - // Policies: [], + Policies: []model.ToscaConceptIdentifier{}, PdpResponse: &model.PdpResponseDetails{ ResponseTo: &pdpStateChange.RequestId, ResponseStatus: &responseStatus, diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go index 83154ca..17805fe 100644 --- a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go +++ b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go @@ -35,7 +35,7 @@ func TestSendPdpUpdateResponse_Success(t *testing.T) { mockSender.On("SendPdpStatus", mock.Anything).Return(nil) pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"} - err := SendPdpUpdateResponse(mockSender, pdpUpdate) + err := SendPdpUpdateResponse(mockSender, pdpUpdate, "PDPUpdate Successful") assert.NoError(t, err) mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything) } @@ -48,7 +48,7 @@ func TestSendPdpUpdateResponse_Failure(t *testing.T) { pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"} - err := SendPdpUpdateResponse(mockSender, pdpUpdate) + err := SendPdpUpdateResponse(mockSender, pdpUpdate, "PDPUpdate Failure") assert.Error(t, err) diff --git a/pkg/metrics/counters.go b/pkg/metrics/counters.go index e126554..7734cff 100644 --- a/pkg/metrics/counters.go +++ b/pkg/metrics/counters.go @@ -25,12 +25,13 @@ import "sync" var TotalErrorCount int64 var DecisionSuccessCount int64 var DecisionFailureCount int64 +var DeployFailureCount int64 +var DeploySuccessCount int64 +var UndeployFailureCount int64 +var UndeploySuccessCount int64 +var TotalPoliciesCount int64 var mu sync.Mutex - - - - // Increment counter func IncrementTotalErrorCount() { mu.Lock() @@ -39,7 +40,7 @@ func IncrementTotalErrorCount() { } // returns pointer to the counter -func TotalErrorCountRef() *int64 { +func totalErrorCountRef() *int64 { mu.Lock() defer mu.Unlock() return &TotalErrorCount @@ -53,7 +54,7 @@ func IncrementDecisionSuccessCount() { } // returns pointer to the counter -func TotalDecisionSuccessCountRef() *int64 { +func totalDecisionSuccessCountRef() *int64 { mu.Lock() defer mu.Unlock() return &DecisionSuccessCount @@ -74,3 +75,84 @@ func TotalDecisionFailureCountRef() *int64 { return &DecisionFailureCount } + +// Increment counter +func IncrementDeploySuccessCount() { + mu.Lock() + DeploySuccessCount++ + mu.Unlock() +} + +// returns pointer to the counter + +func totalDeploySuccessCountRef() *int64 { + mu.Lock() + defer mu.Unlock() + return &DeploySuccessCount + +} + +// Increment counter +func IncrementDeployFailureCount() { + mu.Lock() + DeployFailureCount++ + mu.Unlock() +} + +// returns pointer to the counter + +func totalDeployFailureCountRef() *int64 { + mu.Lock() + defer mu.Unlock() + return &DeployFailureCount + +} + + +// Increment counter +func IncrementUndeploySuccessCount() { + mu.Lock() + UndeploySuccessCount++ + mu.Unlock() +} + +// returns pointer to the counter + +func totalUndeploySuccessCountRef() *int64 { + mu.Lock() + defer mu.Unlock() + return &UndeploySuccessCount + +} + +// Increment counter +func IncrementUndeployFailureCount() { + mu.Lock() + UndeployFailureCount++ + mu.Unlock() +} + +// returns pointer to the counter + +func totalUndeployFailureCountRef() *int64 { + mu.Lock() + defer mu.Unlock() + return &UndeployFailureCount + +} + +// Increment counter +func SetTotalPoliciesCount(newCount int64) { + mu.Lock() + TotalPoliciesCount = newCount + mu.Unlock() +} + +// returns pointer to the counter + +func totalPoliciesCountRef() *int64 { + mu.Lock() + defer mu.Unlock() + return &TotalPoliciesCount +} + diff --git a/pkg/metrics/counters_test.go b/pkg/metrics/counters_test.go index ba8646b..852e365 100644 --- a/pkg/metrics/counters_test.go +++ b/pkg/metrics/counters_test.go @@ -39,7 +39,7 @@ func TestCounters(t *testing.T) { }() } wg.Wait() - assert.Equal(t, int64(5), *TotalErrorCountRef()) + assert.Equal(t, int64(5), *totalErrorCountRef()) // Test IncrementQuerySuccessCount and TotalQuerySuccessCountRef @@ -61,7 +61,7 @@ func TestCounters(t *testing.T) { wg.Wait() - assert.Equal(t, int64(7), *TotalDecisionSuccessCountRef()) + assert.Equal(t, int64(7), *totalDecisionSuccessCountRef()) // Test IncrementDecisionFailureCount and TotalDecisionFailureCountRef diff --git a/pkg/metrics/statistics-provider.go b/pkg/metrics/statistics-provider.go index 9b34839..0f826bc 100644 --- a/pkg/metrics/statistics-provider.go +++ b/pkg/metrics/statistics-provider.go @@ -57,20 +57,20 @@ func FetchCurrentStatistics(res http.ResponseWriter, req *http.Request) { var statReport oapicodegen.StatisticsReport - statReport.DecisionSuccessCount = TotalDecisionSuccessCountRef() + + statReport.DecisionSuccessCount = totalDecisionSuccessCountRef() statReport.DecisionFailureCount = TotalDecisionFailureCountRef() - statReport.TotalErrorCount = TotalErrorCountRef() + statReport.TotalErrorCount = totalErrorCountRef() + statReport.DeployFailureCount = totalDeployFailureCountRef() + statReport.DeploySuccessCount = totalDeploySuccessCountRef() + statReport.UndeployFailureCount = totalUndeployFailureCountRef() + statReport.UndeploySuccessCount = totalUndeploySuccessCountRef() + statReport.TotalPoliciesCount = totalPoliciesCountRef() // not implemented hardcoding the values to zero // will be implemeneted in phase-2 - zerovalue := int64(0) onevalue := int64(1) - statReport.TotalPoliciesCount = &zerovalue statReport.TotalPolicyTypesCount = &onevalue - statReport.DeployFailureCount = &zerovalue - statReport.DeploySuccessCount = &zerovalue - statReport.UndeployFailureCount = &zerovalue - statReport.UndeploySuccessCount = &zerovalue value := int32(200) statReport.Code = &value diff --git a/pkg/model/mesages.go b/pkg/model/mesages.go index 269f45f..966cd46 100644 --- a/pkg/model/mesages.go +++ b/pkg/model/mesages.go @@ -86,11 +86,11 @@ type PdpStatus struct { // models-pdp/src/main/java/org/onap/policy/models/pdp/concepts/PdpUpdate.java type PdpUpdate struct { Source string `json:"source" validate:"required"` - PdpHeartbeatIntervalMs int64 `json:"pdpHeartbeatIntervalMs" validate:"required"` + PdpHeartbeatIntervalMs int64 `json:"pdpHeartbeatIntervalMs"` MessageType string `json:"messageName" validate:"required"` - PoliciesToBeDeloyed []string `json:"policiesToBeDeployed" validate:"required"` - policiesToBeUndeployed []ToscaConceptIdentifier `json:"policiesToBeUndeployed"` - Name string `json:"name" validate:"required"` + PoliciesToBeDeployed []ToscaPolicy `json:"policiesToBeDeployed" validate:"required"` + PoliciesToBeUndeployed []ToscaConceptIdentifier `json:"policiesToBeUndeployed"` + Name string `json:"name"` TimestampMs int64 `json:"timestampMs" validate:"required"` PdpGroup string `json:"pdpGroup" validate:"required"` PdpSubgroup string `json:"pdpSubgroup" validate:"required"` diff --git a/pkg/model/messages_test.go b/pkg/model/messages_test.go index 217cd2a..536f683 100644 --- a/pkg/model/messages_test.go +++ b/pkg/model/messages_test.go @@ -114,8 +114,8 @@ func (p *PdpUpdate) Validate() error { if p.MessageType == "" { return errors.New("MessageType is required") } - if len(p.PoliciesToBeDeloyed) == 0 { - return errors.New("PoliciesToBeDeloyed is required and must contain at least one policy") + if len(p.PoliciesToBeDeployed) == 0 { + return errors.New("PoliciesToBeDeployed is required and must contain at least one policy") } if p.Name == "" { return errors.New("Name is required") @@ -138,11 +138,52 @@ func (p *PdpUpdate) Validate() error { // TestPdpUpdateSerialization_Positive tests the successful serialization of PdpUpdate. func TestPdpUpdateSerialization_Success(t *testing.T) { + policies := []ToscaPolicy{ + { + Type: "onap.policies.native.opa", + TypeVersion: "1.0", + Properties: PolicyProperties{ + Data: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + Policy: map[string]string{ + "policyKey1": "policyValue1", + }, + }, + Name: "MySecurityPolicy", + Version: "1.0.0", + Metadata: Metadata{ + PolicyID: "policy-id-001", + PolicyVersion: "1.0", + }, + }, + { + Type: "onap.policies.native.opa", + TypeVersion: "1.0", + Properties: PolicyProperties{ + Data: map[string]string{ + "threshold": "75", + "duration": "30", + }, + Policy: map[string]string{ + "policyKey2": "policyValue2", + }, + }, + Name: "MyPerformancePolicy", + Version: "1.0.0", + Metadata: Metadata{ + PolicyID: "policy-id-002", + PolicyVersion: "1.0", + }, + }, + } + pdpUpdate := PdpUpdate{ Source: "source1", PdpHeartbeatIntervalMs: 5000, MessageType: "PDP_UPDATE", - PoliciesToBeDeloyed: []string{"policy1", "policy2"}, + PoliciesToBeDeployed: policies, Name: "ExamplePDP", TimestampMs: 1633017600000, PdpGroup: "Group1", @@ -162,7 +203,7 @@ func TestPdpUpdateSerialization_Failure(t *testing.T) { Source: "", PdpHeartbeatIntervalMs: 5000, MessageType: "", - PoliciesToBeDeloyed: nil, + PoliciesToBeDeployed: nil, Name: "", TimestampMs: 0, PdpGroup: "", @@ -264,7 +305,6 @@ func TestPdpHealthStatusEnum(t *testing.T) { } - // TestPdpMessageType_String_Success validates the string representation of valid PdpMessageType values. func TestPdpMessageType_String_Success(t *testing.T) { diff --git a/pkg/model/toscaconceptidentifier.go b/pkg/model/toscaconceptidentifier.go index c9d7788..708442c 100644 --- a/pkg/model/toscaconceptidentifier.go +++ b/pkg/model/toscaconceptidentifier.go @@ -26,8 +26,8 @@ import ( ) type ToscaConceptIdentifier struct { - Name string - Version string + Name string `json:"name"` + Version string `json:"version"` } func NewToscaConceptIdentifier(name, version string) *ToscaConceptIdentifier { diff --git a/pkg/model/toscapolicy.go b/pkg/model/toscapolicy.go new file mode 100644 index 0000000..651ab17 --- /dev/null +++ b/pkg/model/toscapolicy.go @@ -0,0 +1,43 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2025: Deutsche Telekom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 +// ========================LICENSE_END=================================== + +// hold the possible values for state of PDP. +// https://github.com/onap/policy-models/blob/master/models-pdp +// models-pdp/src/main/java/org/onap/policy/models/pdp/enums/PdpState.java +package model + +import () + +type ToscaPolicy struct { + Type string `json:"type"` + TypeVersion string `json:"type_version"` + Properties PolicyProperties `json:"properties"` + Name string `json:"name"` + Version string `json:"version"` + Metadata Metadata `json:"metadata"` +} + +type PolicyProperties struct { + Data map[string]string `json:"data"` + Policy map[string]string `json:"policy"` +} + +type Metadata struct { + PolicyID string `json:"policy-id"` + PolicyVersion string `json:"policy-version"` +} diff --git a/pkg/opasdk/opasdk.go b/pkg/opasdk/opasdk.go index 51a34e7..81b94ce 100644 --- a/pkg/opasdk/opasdk.go +++ b/pkg/opasdk/opasdk.go @@ -27,18 +27,22 @@ import ( "context" "fmt" "io" + "net/http" "os" "policy-opa-pdp/consts" "policy-opa-pdp/pkg/log" "sync" "github.com/open-policy-agent/opa/sdk" + "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/storage/inmem" ) // Define the structs var ( opaInstance *sdk.OPA //A singleton instance of the OPA object once sync.Once //A sync.Once variable used to ensure that the OPA instance is initialized only once, + memStore storage.Store ) // reads JSON configuration from a file and return a jsonReader @@ -65,9 +69,11 @@ func GetOPASingletonInstance() (*sdk.OPA, error) { var err error once.Do(func() { var opaErr error + memStore = inmem.New() opaInstance, opaErr = sdk.New(context.Background(), sdk.Options{ // Configure your OPA instance here V1Compatible: true, + Store: memStore, }) log.Debugf("Create an instance of OPA Object") if opaErr != nil { @@ -88,6 +94,158 @@ func GetOPASingletonInstance() (*sdk.OPA, error) { }) } }) - return opaInstance, err } + +func UpsertPolicy(ctx context.Context, policyID string, policyContent []byte) error { + txn, err := memStore.NewTransaction(ctx, storage.WriteParams) + if err != nil { + log.Warnf("Error creating transaction: %s", err) + memStore.Abort(ctx, txn) + return err + } + err = memStore.UpsertPolicy(ctx, txn, policyID, policyContent) + if err != nil { + log.Warnf("Error inserting policy: %s", err) + memStore.Abort(ctx, txn) + return err + } + err = memStore.Commit(ctx, txn) + if err != nil { + log.Warnf("Error commiting the transaction: %s", err) + memStore.Abort(ctx, txn) + return err + } + return nil +} + +func DeletePolicy(ctx context.Context, policyID string) error { + txn, err := memStore.NewTransaction(ctx, storage.WriteParams) + if err != nil { + log.Warnf("Error creating transaction: %s", err) + memStore.Abort(ctx, txn) + return err + } + err = memStore.DeletePolicy(ctx, txn, policyID) + if err != nil { + log.Warnf("Error deleting policy: %s", err) + memStore.Abort(ctx, txn) + return err + } + err = memStore.Commit(ctx, txn) + if err != nil { + log.Warnf("Error commiting the transaction: %s", err) + memStore.Abort(ctx, txn) + return err + } + return nil +} + +func WriteData(ctx context.Context, dataPath string, data interface{}) error { + txn, err := memStore.NewTransaction(ctx, storage.WriteParams) + if err != nil { + log.Warnf("Error creating transaction: %s", err) + memStore.Abort(ctx, txn) + return err + } + + // Initialize the path if it doesn't exist + err = initializePath(ctx, txn, dataPath) + if err != nil { + log.Warnf("Error initializling Path : %s", dataPath) + log.Warnf("Error : %s", err) + memStore.Abort(ctx, txn) + return err + } + + err = memStore.Write(ctx, txn, storage.AddOp, storage.MustParsePath(dataPath), data) + if err != nil { + log.Warnf("Error Adding data: %s", err) + memStore.Abort(ctx, txn) + return err + } + err = memStore.Commit(ctx, txn) + if err != nil { + log.Warnf("Error commiting the transaction: %s", err) + memStore.Abort(ctx, txn) + return err + } + return nil + +} + +func DeleteData(ctx context.Context, dataPath string) error { + txn, err := memStore.NewTransaction(ctx, storage.WriteParams) + if err != nil { + log.Warnf("Error creating transaction: %s", err) + memStore.Abort(ctx, txn) + return err + } + err = memStore.Write(ctx, txn, storage.RemoveOp, storage.MustParsePath(dataPath), nil) + if err != nil { + log.Warnf("Error deleting data: %s", err) + memStore.Abort(ctx, txn) + return err + } + err = memStore.Commit(ctx, txn) + if err != nil { + log.Warnf("Error commiting the transaction: %s", err) + memStore.Abort(ctx, txn) + return err + } + return nil +} + +func ListPolicies(res http.ResponseWriter, req *http.Request) { + ctx := context.Background() + rtxn, err := memStore.NewTransaction(ctx, storage.TransactionParams{Write: false}) + if err != nil { + log.Warnf("Error creating transaction %s", err) + memStore.Abort(ctx, rtxn) + http.Error(res, err.Error(), http.StatusInternalServerError) + return + } + policies, err := memStore.ListPolicies(ctx, rtxn) + if err != nil { + log.Warnf("Error ListPolicies %s", err) + memStore.Abort(ctx, rtxn) + http.Error(res, err.Error(), http.StatusInternalServerError) + return + } + + for _, policyId := range policies { + log.Debugf("Policy ID: %s", policyId) + policy, err := memStore.GetPolicy(ctx, rtxn, policyId) + if err != nil { + log.Warnf("Error GetPolicy %s", err) + memStore.Abort(ctx, rtxn) + http.Error(res, err.Error(), http.StatusInternalServerError) + return + } + log.Debugf("Policy Content: %s\n", string(policy)) + } + memStore.Abort(ctx, rtxn) + res.WriteHeader(http.StatusOK) + res.Write([]byte("Check logs")) +} + +func initializePath(ctx context.Context, txn storage.Transaction, path string) error { + segments := storage.MustParsePath(path) + for i := 1; i <= len(segments); i++ { + subPath := segments[:i] + _, err := memStore.Read(ctx, txn, subPath) + if err != nil && storage.IsNotFound(err) { + // Create the intermediate path + log.Debugf("storage not found creating : %s", subPath.String()) + err = memStore.Write(ctx, txn, storage.AddOp, subPath, map[string]interface{}{}) + if err != nil { + log.Debugf("Error initializing path: %s", err) + return err + } + } else if err != nil { + log.Debugf("Error reading path: %s", err) + return err + } + } + return nil +} diff --git a/pkg/opasdk/opasdk_test.go b/pkg/opasdk/opasdk_test.go index 80d1a1e..2517376 100644 --- a/pkg/opasdk/opasdk_test.go +++ b/pkg/opasdk/opasdk_test.go @@ -20,43 +20,43 @@ package opasdk import ( + "bou.ke/monkey" + "context" "errors" + "fmt" + "github.com/open-policy-agent/opa/sdk" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "io" "os" "policy-opa-pdp/consts" - "testing" "sync" - "context" - "fmt" - "bou.ke/monkey" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/open-policy-agent/opa/sdk" + "testing" ) // Mock for os.Open type MockFile struct { - mock.Mock + mock.Mock } func (m *MockFile) Open(name string) (*os.File, error) { - args := m.Called(name) - return args.Get(0).(*os.File), args.Error(1) + args := m.Called(name) + return args.Get(0).(*os.File), args.Error(1) } // Mock for io.ReadAll func mockReadAll(r io.Reader) ([]byte, error) { - return []byte(`{"config": "test"}`), nil + return []byte(`{"config": "test"}`), nil } type MockSDK struct { - mock.Mock + mock.Mock } func (m *MockSDK) New(ctx context.Context, options sdk.Options) (*sdk.OPA, error) { - fmt.Print("Inside New Method") - args := m.Called(ctx, options) - return args.Get(0).(*sdk.OPA), args.Error(1) + fmt.Print("Inside New Method") + args := m.Called(ctx, options) + return args.Get(0).(*sdk.OPA), args.Error(1) } func TestGetOPASingletonInstance_ConfigurationFileNotexisting(t *testing.T) { @@ -89,20 +89,20 @@ func TestGetOPASingletonInstance_SingletonBehavior(t *testing.T) { } func TestGetOPASingletonInstance_ConfigurationFileLoaded(t *testing.T) { - tmpFile, err := os.CreateTemp("", "config.json") - if err != nil { - t.Fatalf("Failed to create temp file: %v", err) - } - defer os.Remove(tmpFile.Name()) + tmpFile, err := os.CreateTemp("", "config.json") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + defer os.Remove(tmpFile.Name()) - consts.OpasdkConfigPath = tmpFile.Name() + consts.OpasdkConfigPath = tmpFile.Name() - // Simulate OPA instance creation - opaInstance, err := GetOPASingletonInstance() + // Simulate OPA instance creation + opaInstance, err := GetOPASingletonInstance() - // Assertions - assert.Nil(t, err) - assert.NotNil(t, opaInstance) + // Assertions + assert.Nil(t, err) + assert.NotNil(t, opaInstance) } func TestGetOPASingletonInstance_OPAInstanceCreation(t *testing.T) { @@ -123,79 +123,78 @@ func TestGetOPASingletonInstance_OPAInstanceCreation(t *testing.T) { } func TestGetOPASingletonInstance_JSONReadError(t *testing.T) { - consts.OpasdkConfigPath = "/app/config/config.json" + consts.OpasdkConfigPath = "/app/config/config.json" - // Simulate an error in JSON read (e.g., corrupt file) - mockReadAll := func(r io.Reader) ([]byte, error) { - return nil, errors.New("Failed to read JSON file") - } + // Simulate an error in JSON read (e.g., corrupt file) + mockReadAll := func(r io.Reader) ([]byte, error) { + return nil, errors.New("Failed to read JSON file") + } - jsonReader, err := getJSONReader(consts.OpasdkConfigPath, os.Open, mockReadAll) - assert.NotNil(t, err) - assert.Nil(t, jsonReader) + jsonReader, err := getJSONReader(consts.OpasdkConfigPath, os.Open, mockReadAll) + assert.NotNil(t, err) + assert.Nil(t, jsonReader) } func TestGetOPASingletonInstance_ValidConfigFile(t *testing.T) { - tmpFile, err := os.CreateTemp("", "config.json") - if err != nil { - t.Fatalf("Failed to create temp file: %v", err) - } - defer os.Remove(tmpFile.Name()) + tmpFile, err := os.CreateTemp("", "config.json") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + defer os.Remove(tmpFile.Name()) - consts.OpasdkConfigPath = tmpFile.Name() + consts.OpasdkConfigPath = tmpFile.Name() - // Valid JSON content - validJSON := []byte(`{"config": "test"}`) - err = os.WriteFile(tmpFile.Name(), validJSON, 0644) - if err != nil { - t.Fatalf("Failed to write valid JSON to temp file: %v", err) - } + // Valid JSON content + validJSON := []byte(`{"config": "test"}`) + err = os.WriteFile(tmpFile.Name(), validJSON, 0644) + if err != nil { + t.Fatalf("Failed to write valid JSON to temp file: %v", err) + } - // Call the function - opaInstance, err := GetOPASingletonInstance() + // Call the function + opaInstance, err := GetOPASingletonInstance() - assert.Nil(t, err) - assert.NotNil(t, opaInstance) + assert.Nil(t, err) + assert.NotNil(t, opaInstance) } func TestGetJSONReader(t *testing.T) { - // Create a mock file - mockFile := new(MockFile) - mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil) + // Create a mock file + mockFile := new(MockFile) + mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil) - // Call the function with mock functions - jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, mockReadAll) + // Call the function with mock functions + jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, mockReadAll) - // Check the results - assert.NoError(t, err) - assert.NotNil(t, jsonReader) + // Check the results + assert.NoError(t, err) + assert.NotNil(t, jsonReader) - // Check the content of the jsonReader - expectedContent := `{"config": "test"}` - actualContent := make([]byte, len(expectedContent)) - jsonReader.Read(actualContent) - assert.Equal(t, expectedContent, string(actualContent)) + // Check the content of the jsonReader + expectedContent := `{"config": "test"}` + actualContent := make([]byte, len(expectedContent)) + jsonReader.Read(actualContent) + assert.Equal(t, expectedContent, string(actualContent)) - // Assert that the mock methods were called - mockFile.AssertCalled(t, "Open", "/app/config/config.json") + // Assert that the mock methods were called + mockFile.AssertCalled(t, "Open", "/app/config/config.json") } func TestGetJSONReader_ReadAllError(t *testing.T) { - mockFile := new(MockFile) - mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil) + mockFile := new(MockFile) + mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil) - // Simulate ReadAll error - jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, func(r io.Reader) ([]byte, error) { - return nil, io.ErrUnexpectedEOF - }) + // Simulate ReadAll error + jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, func(r io.Reader) ([]byte, error) { + return nil, io.ErrUnexpectedEOF + }) - assert.Error(t, err) - assert.Nil(t, jsonReader) + assert.Error(t, err) + assert.Nil(t, jsonReader) - mockFile.AssertCalled(t, "Open", "/app/config/config.json") + mockFile.AssertCalled(t, "Open", "/app/config/config.json") } - func TestGetOPASingletonInstance(t *testing.T) { // Call your function under test opaInstance, err := GetOPASingletonInstance() @@ -210,7 +209,6 @@ func TestGetOPASingletonInstance(t *testing.T) { assert.NotNil(t, opaInstance, "OPA instance should be nil when sdk.New fails") } - // Helper to reset the singleton for testing func resetSingleton() { opaInstance = nil diff --git a/pkg/pdpattributes/pdpattributes.go b/pkg/pdpattributes/pdpattributes.go index 8ce738b..005dd03 100644 --- a/pkg/pdpattributes/pdpattributes.go +++ b/pkg/pdpattributes/pdpattributes.go @@ -33,12 +33,12 @@ var ( ) func init() { - PdpName = GenerateUniquePdpName() + PdpName = generateUniquePdpName() log.Debugf("Name: %s", PdpName) } // Generates a unique PDP name by appending a randomly generated UUID -func GenerateUniquePdpName() string { +func generateUniquePdpName() string { return "opa-" + uuid.New().String() } @@ -48,7 +48,7 @@ func SetPdpSubgroup(pdpsubgroup string) { } // Retrieves the current PDP subgroup value. -func GetPdpSubgroup() string { +func getPdpSubgroup() string { return PdpSubgroup } @@ -58,7 +58,7 @@ func SetPdpHeartbeatInterval(pdpHeartbeatInterval int64) { } // Retrieves the current PDP heartbeat interval value. -func GetPdpHeartbeatInterval() int64 { +func getPdpHeartbeatInterval() int64 { return PdpHeartbeatInterval } diff --git a/pkg/pdpattributes/pdpattributes_test.go b/pkg/pdpattributes/pdpattributes_test.go index 909b4ee..c9a41c7 100644 --- a/pkg/pdpattributes/pdpattributes_test.go +++ b/pkg/pdpattributes/pdpattributes_test.go @@ -28,15 +28,15 @@ import ( func TestGenerateUniquePdpName_Success(t *testing.T) { t.Run("GenerateValidPdpName", func(t *testing.T) { - pdpName := GenerateUniquePdpName() + pdpName := generateUniquePdpName() assert.Contains(t, pdpName, "opa-", "Expected PDP name to start with 'opa-'") }) } func TestGenerateUniquePdpName_Failure(t *testing.T) { t.Run("UniqueNamesCheck", func(t *testing.T) { - pdpName1 := GenerateUniquePdpName() - pdpName2 := GenerateUniquePdpName() + pdpName1 := generateUniquePdpName() + pdpName2 := generateUniquePdpName() assert.NotEqual(t, pdpName1, pdpName2, "Expected different UUID for each generated PDP name") assert.Len(t, pdpName1, len("opa-")+36, "Expected length of PDP name to match 'opa-<UUID>' format") }) @@ -46,14 +46,14 @@ func TestSetPdpSubgroup_Success(t *testing.T) { t.Run("ValidSubgroup", func(t *testing.T) { expectedSubgroup := "subgroup1" SetPdpSubgroup(expectedSubgroup) - assert.Equal(t, expectedSubgroup, GetPdpSubgroup(), "Expected PDP subgroup to match set value") + assert.Equal(t, expectedSubgroup, getPdpSubgroup(), "Expected PDP subgroup to match set value") }) } func TestSetPdpSubgroup_Failure(t *testing.T) { t.Run("EmptySubgroup", func(t *testing.T) { SetPdpSubgroup("") - assert.Equal(t, "", GetPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string") + assert.Equal(t, "", getPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string") }) t.Run("LargeSubgroup", func(t *testing.T) { @@ -62,7 +62,7 @@ func TestSetPdpSubgroup_Failure(t *testing.T) { largeSubgroup[i] = 'a' } SetPdpSubgroup(string(largeSubgroup)) - assert.Equal(t, string(largeSubgroup), GetPdpSubgroup(), "Expected large PDP subgroup to match set value") + assert.Equal(t, string(largeSubgroup), getPdpSubgroup(), "Expected large PDP subgroup to match set value") }) } @@ -70,19 +70,19 @@ func TestSetPdpHeartbeatInterval_Success(t *testing.T) { t.Run("ValidHeartbeatInterval", func(t *testing.T) { expectedInterval := int64(30) SetPdpHeartbeatInterval(expectedInterval) - assert.Equal(t, expectedInterval, GetPdpHeartbeatInterval(), "Expected heartbeat interval to match set value") + assert.Equal(t, expectedInterval, getPdpHeartbeatInterval(), "Expected heartbeat interval to match set value") }) } func TestSetPdpHeartbeatInterval_Failure(t *testing.T) { t.Run("FailureHeartbeatInterval", func(t *testing.T) { SetPdpHeartbeatInterval(-10) - assert.Equal(t, int64(-10), GetPdpHeartbeatInterval(), "Expected heartbeat interval to handle negative values") + assert.Equal(t, int64(-10), getPdpHeartbeatInterval(), "Expected heartbeat interval to handle negative values") }) t.Run("LargeHeartbeatInterval", func(t *testing.T) { largeInterval := int64(time.Hour * 24 * 365 * 10) // 10 years in seconds SetPdpHeartbeatInterval(largeInterval) - assert.Equal(t, largeInterval, GetPdpHeartbeatInterval(), "Expected PDP heartbeat interval to handle large values") + assert.Equal(t, largeInterval, getPdpHeartbeatInterval(), "Expected PDP heartbeat interval to handle large values") }) } diff --git a/pkg/policymap/policy_and_data_map.go b/pkg/policymap/policy_and_data_map.go new file mode 100644 index 0000000..5e06a59 --- /dev/null +++ b/pkg/policymap/policy_and_data_map.go @@ -0,0 +1,204 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2025: Deutsche Telekom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 +// ========================LICENSE_END=================================== + +// will process the update message from pap and send the pdp status response. +package policymap + +import ( + "encoding/json" + "fmt" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/model" +) + +var ( + LastDeployedPolicies string +) + +func formatPolicyAndDataMap(deployedPolicies []map[string]interface{}) (string, error) { + + // Create the final JSON + finalMap := map[string]interface{}{ + "deployed_policies_dict": deployedPolicies, + } + + // Marshal the final map into JSON + policyDataJSON, err := FormatMapofAnyType(finalMap) + if err != nil { + return "", fmt.Errorf("failed to format json: %v", err) + } + + // Update global state + LastDeployedPolicies = policyDataJSON + log.Infof("PoliciesDeployed Map: %v", LastDeployedPolicies) + + return LastDeployedPolicies, nil +} + +func FormatMapofAnyType[T any](mapOfAnyType T) (string, error) { + // Marshal the final map into JSON + jsonBytes, err := json.MarshalIndent(mapOfAnyType, "", " ") + if err != nil { + return "", fmt.Errorf("failed to format json: %v", err) + } + + return string(jsonBytes), nil +} + +func UnmarshalLastDeployedPolicies(lastdeployedPolicies string) ([]map[string]interface{}, error) { + if len(lastdeployedPolicies) == 0 { + return []map[string]interface{}{}, nil + } + + var policiesMap struct { + DeployedPoliciesDict []map[string]interface{} `json:"deployed_policies_dict"` + } + + err := json.Unmarshal([]byte(lastdeployedPolicies), &policiesMap) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal LastDeployedPolicies: %v", err) + } + + return policiesMap.DeployedPoliciesDict, nil +} + +func UpdateDeployedPoliciesinMap(policy model.ToscaPolicy) (string, error) { + + // Unmarshal the last known policies + deployedPolicies, err := UnmarshalLastDeployedPolicies(LastDeployedPolicies) + if err != nil { + log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err) + } + + dataKeys := make([]string, 0, len(policy.Properties.Data)) + policyKeys := make([]string, 0, len(policy.Properties.Policy)) + + for key := range policy.Properties.Data { + dataKeys = append(dataKeys, key) + } + + for key := range policy.Properties.Policy { + policyKeys = append(policyKeys, key) + } + + directoryMap := map[string]interface{}{ + "policy-id": policy.Metadata.PolicyID, + "policy-version": policy.Metadata.PolicyVersion, + "data": dataKeys, + "policy": policyKeys, + } + deployedPolicies = append(deployedPolicies, directoryMap) + return formatPolicyAndDataMap(deployedPolicies) +} + +func RemoveUndeployedPoliciesfromMap(undeployedPolicy map[string]interface{}) (string, error) { + + // Unmarshal the last known policies + deployedPolicies, err := UnmarshalLastDeployedPolicies(LastDeployedPolicies) + if err != nil { + log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err) + } + + remainingPolicies := []map[string]interface{}{} + + for _, policy := range deployedPolicies { + shouldRetain := true + if policy["policy-id"] == undeployedPolicy["policy-id"] && policy["policy-version"] == undeployedPolicy["policy-version"] { + shouldRetain = false + } + if shouldRetain { + remainingPolicies = append(remainingPolicies, policy) + } + } + + return formatPolicyAndDataMap(remainingPolicies) +} + +func VerifyAndReturnPoliciesToBeDeployed(lastdeployedPoliciesMap string, pdpUpdate model.PdpUpdate) []model.ToscaPolicy { + type PoliciesMap struct { + DeployedPoliciesDict []map[string]interface{} `json:"deployed_policies_dict"` + } + + var policiesMap PoliciesMap + err := json.Unmarshal([]byte(lastdeployedPoliciesMap), &policiesMap) + if err != nil { + log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err) + return pdpUpdate.PoliciesToBeDeployed + } + + deployedPolicies := policiesMap.DeployedPoliciesDict + var policiesToBeDeployed []model.ToscaPolicy + + for _, deployingPolicy := range pdpUpdate.PoliciesToBeDeployed { + shouldDeploy := true + for _, deployedPolicy := range deployedPolicies { + if deployedPolicy["policy-id"] == deployingPolicy.Name && deployedPolicy["policy-version"] == deployingPolicy.Version { + log.Infof("Policy Previously deployed: %v %v, skipping", deployingPolicy.Name, deployingPolicy.Version) + shouldDeploy = false + break + } + } + + if shouldDeploy { + log.Infof("Policy is new and should be deployed: %v %v", deployingPolicy.Name, deployingPolicy.Version) + policiesToBeDeployed = append(policiesToBeDeployed, deployingPolicy) + } + } + + return policiesToBeDeployed + +} + +func ExtractDeployedPolicies(policiesMap string) []model.ToscaConceptIdentifier { + + // Unmarshal the last known policies + deployedPolicies, err := UnmarshalLastDeployedPolicies(policiesMap) + if err != nil { + log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err) + } + + pdpstatus := model.PdpStatus{ + Policies: []model.ToscaConceptIdentifier{}, + } + + for _, policy := range deployedPolicies { + + // Extract policy-id and policy-version + policyID, idOk := policy["policy-id"].(string) + policyVersion, versionOk := policy["policy-version"].(string) + if !idOk || !versionOk { + log.Warnf("Missing or invalid policy-id or policy-version") + return nil + } + tosca := model.ToscaConceptIdentifier{ + Name: policyID, + Version: policyVersion, + } + pdpstatus.Policies = append(pdpstatus.Policies, tosca) + } + return pdpstatus.Policies +} + +func GetTotalDeployedPoliciesCountFromMap() int { + deployedPolicies, err := UnmarshalLastDeployedPolicies(LastDeployedPolicies) + if err != nil { + log.Warnf("Failed to unmarshal LastDeployedPolicies: %v", err) + return 0 + } + return len(deployedPolicies) +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 9b405d5..313b9a6 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,7 +21,14 @@ package utils import ( + "fmt" + "github.com/go-playground/validator/v10" "github.com/google/uuid" + "os" + "path/filepath" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/model" + "strings" ) // validates if the given request is in valid uuid form @@ -29,3 +36,181 @@ func IsValidUUID(u string) bool { _, err := uuid.Parse(u) return err == nil } + +// Helper function to create a directory if it doesn't exist +func CreateDirectory(dirPath string) error { + err := os.MkdirAll(dirPath, os.ModePerm) + if err != nil { + log.Errorf("Failed to create directory %s: %v", dirPath, err) + return err + } + log.Infof("Directory created: %s", dirPath) + return nil +} + +// Helper function to check and remove a directory +func RemoveDirectory(dirPath string) error { + + entries, err := os.ReadDir(dirPath) + if err != nil { + if os.IsNotExist(err) { + log.Warnf("Directory does not exist: %s", dirPath) + // Directory does not exist, nothing to do + return nil + } + return fmt.Errorf("failed to read directory: %w", err) + } + + for _, entry := range entries { + entryPath := filepath.Join(dirPath, entry.Name()) + + if entry.IsDir() { + // Check if the subdirectory is empty and delete it + isEmpty, err := isDirEmpty(entryPath) + if err != nil { + return err + } + if isEmpty { + log.Debugf("Removing empty subdirectory: %s", entryPath) + if err := os.RemoveAll(entryPath); err != nil { + return fmt.Errorf("failed to remove directory: %s, error: %w", entryPath, err) + } + } + } else { + // Delete specific files in the parent directory + if entry.Name() == "data.json" || entry.Name() == "policy.rego" { + log.Debugf("Removing file: %s", entryPath) + if err := os.Remove(entryPath); err != nil { + return fmt.Errorf("failed to remove file: %s, error: %w", entryPath, err) + } + } + } + } + + return nil +} + +// Helper function to check if a directory is empty +func isDirEmpty(dirPath string) (bool, error) { + entries, err := os.ReadDir(dirPath) + if err != nil { + return false, fmt.Errorf("failed to read directory: %s, error: %w", dirPath, err) + } + return len(entries) == 0, nil +} + +func ValidateFieldsStructs(pdpUpdate model.PdpUpdate) error { + //Initialize Validator and validate Struct after unmarshalling + validate := validator.New() + + err := validate.Struct(pdpUpdate) + if err != nil { + for _, err := range err.(validator.ValidationErrors) { + log.Infof("Field %s failed on the %s tag\n", err.Field(), err.Tag()) + } + return err + } + return err +} + +// Validate validates the fields based on your requirements. +func ValidateToscaPolicyJsonFields(policy model.ToscaPolicy) error { + // 1. Validate that Name, Version, and Metadata fields match. + emphasize := "Validation emphasizes the condition" + if policy.Name != policy.Metadata.PolicyID { + return fmt.Errorf("policy name '%s' does not match metadata policy-id '%s', '%s'", policy.Name, policy.Metadata.PolicyID, emphasize) + } + if policy.Version != policy.Metadata.PolicyVersion { + return fmt.Errorf("policy version '%s' does not match metadata policy-version '%s', '%s'", policy.Version, policy.Metadata.PolicyVersion, emphasize) + } + + if policy.Properties.Data != nil { + // 2. Validate that Name is a suffix for keys in Properties.Data and Properties.Policy. + keySeen := make(map[string]bool) + for key := range policy.Properties.Data { + if keySeen[key] { + return fmt.Errorf("duplicate data key '%s' found, '%s'", key, emphasize) + } + keySeen[key] = true + if !strings.HasPrefix(key, "node." + policy.Name) { + return fmt.Errorf("data key '%s' does not have name '%s' as a prefix, '%s'", key, policy.Name, emphasize) + } + } + } + keySeen := make(map[string]bool) + for key := range policy.Properties.Policy { + if keySeen[key] { + return fmt.Errorf("duplicate policy key '%s' found, '%s'", key, emphasize) + } + keySeen[key] = true + if !strings.HasPrefix(key, policy.Name) { + return fmt.Errorf("policy key '%s' does not have name '%s' as a prefix, '%s'", key, policy.Name, emphasize) + } + } + + return nil +} + +func IsPolicyNameAllowed(policy model.ToscaPolicy, deployedPolicies []map[string]interface{}) (bool, error) { + + policyID := policy.Name + + if policyID == "" { + return false, fmt.Errorf("Policy Name cannot be Empty") + } + + policyHierarchyLevel := strings.Split(policyID, ".") + + for _, deployedPolicy := range deployedPolicies { + deployedPolicyID, ok := deployedPolicy["policy-id"].(string) + if !ok { + return false, fmt.Errorf("Invalid or missing policy-id field") + } + + deployedPolicyIDHierarchyLevel := strings.Split(deployedPolicyID, ".") + + if isParentOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel) { + return false, fmt.Errorf("Policy Validation Failed : Policy-id: %s is parent of deployed policy, overrides existing policy: %s", policyID, deployedPolicyID) + } + + if isChildOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel) { + return false, fmt.Errorf("Policy Validation Failed: Policy-id: %s is child of deployed policy , can overwrite existing policy: %s", policyID, deployedPolicyID) + + } + + } + + return true, nil +} + +func isParentOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel []string) bool { + + // new policy should have fewer levels than deployed policy to be a parent + if len(policyHierarchyLevel) < len(deployedPolicyIDHierarchyLevel) { + for policyNameIndex := range policyHierarchyLevel { + if policyHierarchyLevel[policyNameIndex] != deployedPolicyIDHierarchyLevel[policyNameIndex] { + return false + } + } + return true + + } + + return false +} + +func isChildOfExistingPolicy(policyHierarchyLevel, deployedPolicyIDHierarchyLevel []string) bool { + + // new policy should have more levels than deployed policy to be a child + if len(policyHierarchyLevel) > len(deployedPolicyIDHierarchyLevel) { + for policyNameIndex := range deployedPolicyIDHierarchyLevel { + if deployedPolicyIDHierarchyLevel[policyNameIndex] != policyHierarchyLevel[policyNameIndex] { + return false + } + } + return true + + } + + return false +} diff --git a/test/README.md b/test/README.md index 96f4eed..bc9f931 100644 --- a/test/README.md +++ b/test/README.md @@ -1,5 +1,19 @@ # Testing OPA +## Curl URL For Deployment. + +1. `curl -u 'policyadmin:zb!XztG34' -X POST -H "Content-Type":"application/yaml" --data-binary @test_resources/policy_deploy_single_policy.yaml http://localhost:30002/policy/api/v1/policytypes/onap.policies.native.opa/versions/1.0.0/policies` + +2. `curl -u 'policyadmin:zb!XztG34' -X POST -H "Content-Type":"application/json" -d @test_resources/deploy.json http://localhost:30003/policy/pap/v1/pdps/policies` + +## Curl URL For Undeployment + +`curl -u 'policyadmin:zb!XztG34' -X DELETE http://localhost:30003/policy/pap/v1/pdps/policies/role/versions/1.0.0` , where role is the policy name. + +## Curl URL for Batch Undeployment. + +`curl -v -u 'policyadmin:zb!XztG34' -X POST -H "Content-Type":"application/json" -d @test_resources/undeploy_batch_delete.json http://localhost:30003/policy/pap/v1/pdps/deployments/batch` + ## Verification API Calls curl -u 'policyadmin:zb!XztG34' -H 'Content-Type: application/json' -H 'Accept: application/json' -d '{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "11:34:56", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z", "policyFilter" : [""], "policyName":"example","input":{"method":"POST","path":["users"]}}' -X POST http://0.0.0.0:8282/policy/pdpo/v1/decision diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 273fd1d..a4dea24 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -97,13 +97,6 @@ services: - ./scripts.sh:/app/scripts.sh - ./Opagroup.json:/app/Opagroup.json - ./policy-new.yaml:/app/policy-new.yaml - - type: bind - source: ./policies - target: /opt/policies - - type: bind - source: ./data - target: /opt/data - environment: LOG_LEVEL: debug KAFKA_URL: "kafka:9092" diff --git a/test/test_resources/deploy.json b/test/test_resources/deploy.json new file mode 100644 index 0000000..2698286 --- /dev/null +++ b/test/test_resources/deploy.json @@ -0,0 +1,10 @@ +{ + "policies": [ + { + "policy-id": "role", + "policy-version": "1.0.0" + + } + ] +} + diff --git a/test/test_resources/deploy_collab.json b/test/test_resources/deploy_collab.json new file mode 100644 index 0000000..06d43c9 --- /dev/null +++ b/test/test_resources/deploy_collab.json @@ -0,0 +1,10 @@ +{ + "policies": [ + { + "policy-id": "collab", + "policy-version": "1.0.0" + + } + ] +} + diff --git a/test/test_resources/deploy_conflict.json b/test/test_resources/deploy_conflict.json new file mode 100644 index 0000000..72d2fcd --- /dev/null +++ b/test/test_resources/deploy_conflict.json @@ -0,0 +1,10 @@ +{ + "policies": [ + { + "policy-id": "conflict", + "policy-version": "1.0.0" + + } + ] +} + diff --git a/test/test_resources/deploy_zone.json b/test/test_resources/deploy_zone.json new file mode 100644 index 0000000..217457b --- /dev/null +++ b/test/test_resources/deploy_zone.json @@ -0,0 +1,11 @@ + +{ + "policies": [ + { + "policy-id": "zone", + "policy-version": "1.0.0" + + } + ] +} + diff --git a/test/test_resources/policy_collab.yaml b/test/test_resources/policy_collab.yaml new file mode 100644 index 0000000..e56bbed --- /dev/null +++ b/test/test_resources/policy_collab.yaml @@ -0,0 +1,18 @@ +tosca_definitions_version: tosca_simple_yaml_1_1_0 +topology_template: + policies: + - collab: + type: onap.policies.native.opa + type_version: 1.0.0 + properties: + data: + collab.action: ewogICAgInVzZXJfcm9sZXMiOiB7CiAgICAgICAgImFsaWNlIjogWwogICAgICAgICAgICAiYWRtaW4iCiAgICAgICAgXSwKICAgICAgICAiYm9iIjogWwogICAgICAgICAgICAiZWRpdG9yIgogICAgICAgIF0sCiAgICAgICAgImNoYXJsaWUiOiBbCiAgICAgICAgICAgICJ2aWV3ZXIiCiAgICAgICAgXQogICAgfSwKICAgICJyb2xlX3Blcm1pc3Npb25zIjogewogICAgICAgICJhZG1pbiI6IHsKICAgICAgICAgICAgImFjdGlvbnMiOiBbCiAgICAgICAgICAgICAgICAicmVhZCIsCiAgICAgICAgICAgICAgICAid3JpdGUiLAogICAgICAgICAgICAgICAgImRlbGV0ZSIKICAgICAgICAgICAgXSwKICAgICAgICAgICAgInJlc291cmNlcyI6IFsKICAgICAgICAgICAgICAgICJzZXJ2ZXIiLAogICAgICAgICAgICAgICAgImRhdGFiYXNlIgogICAgICAgICAgICBdCiAgICAgICAgfSwKICAgICAgICAiZWRpdG9yIjogewogICAgICAgICAgICAiYWN0aW9ucyI6IFsKICAgICAgICAgICAgICAgICJyZWFkIiwKICAgICAgICAgICAgICAgICJ3cml0ZSIKICAgICAgICAgICAgXSwKICAgICAgICAgICAgInJlc291cmNlcyI6IFsKICAgICAgICAgICAgICAgICJzZXJ2ZXIiCiAgICAgICAgICAgIF0KICAgICAgICB9LAogICAgICAgICJ2aWV3ZXIiOiB7CiAgICAgICAgICAgICJhY3Rpb25zIjogWwogICAgICAgICAgICAgICAgInJlYWQiCiAgICAgICAgICAgIF0sCiAgICAgICAgICAgICJyZXNvdXJjZXMiOiBbCiAgICAgICAgICAgICAgICAic2VydmVyIgogICAgICAgICAgICBdCiAgICAgICAgfQogICAgfQp9 + policy: + collab.conflict: cGFja2FnZSBjb2xsYWIuY29uZmxpY3QKCmltcG9ydCByZWdvLnYxCgphbGxvdyBpZiB7IGlucHV0Lm5hbWUgPT0gImFsaWNlIiB9CmRlbnkgaWYgeyBpbnB1dC5uYW1lID09ICJhbGljZSIgfQoKIyBkZW55IGV2ZXJ5dGhpbmcgYnkgZGVmYXVsdApkZWZhdWx0IGF1dGh6IDo9IGZhbHNlCgojIGRlbnkgb3ZlcnJpZGVzIGFsbG93CmF1dGh6IGlmIHsKICAgIGFsbG93CiAgICBub3QgZGVueQp9Cg== + collab.action: cGFja2FnZSBjb2xsYWIuYWN0aW9uCgppbXBvcnQgcmVnby52MQoKIyBCeSBkZWZhdWx0LCBkZW55IHJlcXVlc3RzLgpkZWZhdWx0IGFsbG93IDo9IGZhbHNlCgoKIyBBbGxvdyB0aGUgYWN0aW9uIGlmIGFkbWluIHJvbGUgaXMgZ3JhbnRlZCBwZXJtaXNzaW9uIHRvIHBlcmZvcm0gdGhlIGFjdGlvbi4KYWxsb3cgaWYgewogICAgc29tZSBpCiAgICBkYXRhLmNvbGxhYi5hY3Rpb24udXNlcl9yb2xlc1tpbnB1dC51c2VyXVtpXSA9PSByb2xlCiAgICBzb21lIGoKICAgIGRhdGEuY29sbGFiLmFjdGlvbi5yb2xlX3Blcm1pc3Npb25zW3JvbGVdLmFjdGlvbnNbal0gPT0gaW5wdXQuYWN0aW9uCiAgICBzb21lIGsKICAgIGRhdGEuY29sbGFiLmFjdGlvbi5yb2xlX3Blcm1pc3Npb25zW3JvbGVdLnJlc291cmNlc1trXSA9PSBpbnB1dC50eXBlCn0KIyAgICAgICAqIFJlZ28gY29tcGFyaXNvbiB0byBvdGhlciBzeXN0ZW1zOiBodHRwczovL3d3dy5vcGVucG9saWN5YWdlbnQub3JnL2RvY3MvbGF0ZXN0L2NvbXBhcmlzb24tdG8tb3RoZXItc3lzdGVtcy8KIyAgICAgICAqIFJlZ28gSXRlcmF0aW9uOiBodHRwczovL3d3dy5vcGVucG9saWN5YWdlbnQub3JnL2RvY3MvbGF0ZXN0LyNpdGVyYXRpb24KCg== + collab: cGFja2FnZSBjb2xsYWIKCmltcG9ydCByZWdvLnYxCmltcG9ydCBkYXRhLmNvbGxhYi5jb25mbGljdAppbXBvcnQgZGF0YS5jb2xsYWIuYWN0aW9uCgpkZWZhdWx0IGFsbG93IDo9IGZhbHNlCmFsbG93IGlmIHsKICAgIGNvbmZsaWN0LmFsbG93CiAgICBhY3Rpb24uYWxsb3cKfQ== + name: collab + version: 1.0.0 + metadata: + policy-id: collab + policy-version: 1.0.0 diff --git a/test/test_resources/policy_conflict.yaml b/test/test_resources/policy_conflict.yaml new file mode 100644 index 0000000..0406179 --- /dev/null +++ b/test/test_resources/policy_conflict.yaml @@ -0,0 +1,15 @@ +tosca_definitions_version: tosca_simple_yaml_1_1_0 +topology_template: + policies: + - conflict: + type: onap.policies.native.opa + type_version: 1.0.0 + properties: + data: + policy: + conflict: cGFja2FnZSBjb25mbGljdAoKaW1wb3J0IHJlZ28udjEKCmFsbG93IGlmIHsgaW5wdXQubmFtZSA9PSAiYWxpY2UiIH0KZGVueSBpZiB7IGlucHV0Lm5hbWUgPT0gImFsaWNlIiB9CgojIGRlbnkgZXZlcnl0aGluZyBieSBkZWZhdWx0CmRlZmF1bHQgYXV0aHogOj0gZmFsc2UKCiMgZGVueSBvdmVycmlkZXMgYWxsb3cKYXV0aHogaWYgewogICAgYWxsb3cKICAgIG5vdCBkZW55Cn0KCg== + name: conflict + version: 1.0.0 + metadata: + policy-id: conflict + policy-version: 1.0.0 diff --git a/test/test_resources/policy_deploy_single_policy.yaml b/test/test_resources/policy_deploy_single_policy.yaml new file mode 100644 index 0000000..872b6c6 --- /dev/null +++ b/test/test_resources/policy_deploy_single_policy.yaml @@ -0,0 +1,17 @@ +tosca_definitions_version: tosca_simple_yaml_1_1_0 +topology_template: + policies: + - role: + type: onap.policies.native.opa + type_version: 1.0.0 + properties: + data: + role: ewogICAgInVzZXJfcm9sZXMiOiB7CiAgICAgICAgImFsaWNlIjogWwogICAgICAgICAgICAiYWRtaW4iCiAgICAgICAgXSwKICAgICAgICAiYm9iIjogWwogICAgICAgICAgICAiZW1wbG95ZWUiLAogICAgICAgICAgICAiYmlsbGluZyIKICAgICAgICBdLAogICAgICAgICJldmUiOiBbCiAgICAgICAgICAgICJjdXN0b21lciIKICAgICAgICBdCiAgICB9LAogICAgInJvbGVfZ3JhbnRzIjogewogICAgICAgICJjdXN0b21lciI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImNhdCIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJhZG9wdCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAiYWRvcHQiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiZW1wbG95ZWUiOiBbCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJkb2ciCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAicmVhZCIsCiAgICAgICAgICAgICAgICAidHlwZSI6ICJjYXQiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImRvZyIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJ1cGRhdGUiLAogICAgICAgICAgICAgICAgInR5cGUiOiAiY2F0IgogICAgICAgICAgICB9CiAgICAgICAgXSwKICAgICAgICAiYmlsbGluZyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICAgImFjdGlvbiI6ICJyZWFkIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgICJhY3Rpb24iOiAidXBkYXRlIiwKICAgICAgICAgICAgICAgICJ0eXBlIjogImZpbmFuY2UiCiAgICAgICAgICAgIH0KICAgICAgICBdCiAgICB9Cn0K + policy: + role: cGFja2FnZSByb2xlCgppbXBvcnQgcmVnby52MQoKIyBCeSBkZWZhdWx0LCBkZW55IHJlcXVlc3RzLgpkZWZhdWx0IGFsbG93IDo9IGZhbHNlCgojIEFsbG93IGFkbWlucyB0byBkbyBhbnl0aGluZy4KYWxsb3cgaWYgdXNlcl9pc19hZG1pbgoKIyBBbGxvdyB0aGUgYWN0aW9uIGlmIHRoZSB1c2VyIGlzIGdyYW50ZWQgcGVybWlzc2lvbiB0byBwZXJmb3JtIHRoZSBhY3Rpb24uCmFsbG93IGlmIHsKICAgICAgICAjIEZpbmQgZ3JhbnRzIGZvciB0aGUgdXNlci4KICAgICAgICBzb21lIGdyYW50IGluIHVzZXJfaXNfZ3JhbnRlZAoKICAgICAgICAjIENoZWNrIGlmIHRoZSBncmFudCBwZXJtaXRzIHRoZSBhY3Rpb24uCiAgICAgICAgaW5wdXQuYWN0aW9uID09IGdyYW50LmFjdGlvbgogICAgICAgIGlucHV0LnR5cGUgPT0gZ3JhbnQudHlwZQp9CgojIHVzZXJfaXNfYWRtaW4gaXMgdHJ1ZSBpZiAiYWRtaW4iIGlzIGFtb25nIHRoZSB1c2VyJ3Mgcm9sZXMgYXMgcGVyIGRhdGEudXNlcl9yb2xlcwp1c2VyX2lzX2FkbWluIGlmICJhZG1pbiIgaW4gZGF0YS5yb2xlLnVzZXJfcm9sZXNbaW5wdXQudXNlcl0KCiMgdXNlcl9pc19ncmFudGVkIGlzIGEgc2V0IG9mIGdyYW50cyBmb3IgdGhlIHVzZXIgaWRlbnRpZmllZCBpbiB0aGUgcmVxdWVzdC4KIyBUaGUgYGdyYW50YCB3aWxsIGJlIGNvbnRhaW5lZCBpZiB0aGUgc2V0IGB1c2VyX2lzX2dyYW50ZWRgIGZvciBldmVyeS4uLgp1c2VyX2lzX2dyYW50ZWQgY29udGFpbnMgZ3JhbnQgaWYgewogICAgICAgICMgYHJvbGVgIGFzc2lnbmVkIGFuIGVsZW1lbnQgb2YgdGhlIHVzZXJfcm9sZXMgZm9yIHRoaXMgdXNlci4uLgogICAgICAgIHNvbWUgcm9sZSBpbiBkYXRhLnJvbGUudXNlcl9yb2xlc1tpbnB1dC51c2VyXQoKICAgICAgICAjIGBncmFudGAgYXNzaWduZWQgYSBzaW5nbGUgZ3JhbnQgZnJvbSB0aGUgZ3JhbnRzIGxpc3QgZm9yICdyb2xlJy4uLgogICAgICAgIHNvbWUgZ3JhbnQgaW4gZGF0YS5yb2xlLnJvbGVfZ3JhbnRzW3JvbGVdCn0KCiMgICAgICAgKiBSZWdvIGNvbXBhcmlzb24gdG8gb3RoZXIgc3lzdGVtczogaHR0cHM6Ly93d3cub3BlbnBvbGljeWFnZW50Lm9yZy9kb2NzL2xhdGVzdC9jb21wYXJpc29uLXRvLW90aGVyLXN5c3RlbXMvCiMgICAgICAgKiBSZWdvIEl0ZXJhdGlvbjogaHR0cHM6Ly93d3cub3BlbnBvbGljeWFnZW50Lm9yZy9kb2NzL2xhdGVzdC8jaXRlcmF0aW9uCgo= + name: role + version: 1.0.0 + metadata: + policy-id: role + policy-version: 1.0.0 + diff --git a/test/test_resources/policy_zone.yaml b/test/test_resources/policy_zone.yaml new file mode 100644 index 0000000..4f3ade1 --- /dev/null +++ b/test/test_resources/policy_zone.yaml @@ -0,0 +1,16 @@ +tosca_definitions_version: tosca_simple_yaml_1_1_0 +topology_template: + policies: + - zone: + type: onap.policies.native.opa + type_version: 1.0.0 + properties: + data: + zone: ewogICJ6b25lIjogewogICAgInpvbmVfYWNjZXNzX2xvZ3MiOiBbCiAgICAgIHsgImxvZ19pZCI6ICJsb2cxIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDA5OjAwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJncmFudGVkIiwgInVzZXIiOiAidXNlcjEiIH0sCiAgICAgIHsgImxvZ19pZCI6ICJsb2cyIiwgInRpbWVzdGFtcCI6ICIyMDI0LTExLTAxVDEwOjMwOjAwWiIsICJ6b25lX2lkIjogInpvbmVBIiwgImFjY2VzcyI6ICJkZW5pZWQiLCAidXNlciI6ICJ1c2VyMiIgfSwKICAgICAgeyAibG9nX2lkIjogImxvZzMiLCAidGltZXN0YW1wIjogIjIwMjQtMTEtMDFUMTE6MDA6MDBaIiwgInpvbmVfaWQiOiAiem9uZUIiLCAiYWNjZXNzIjogImdyYW50ZWQiLCAidXNlciI6ICJ1c2VyMyIgfQogICAgXQogIH0KfQo= + policy: + zone : cGFja2FnZSB6b25lCgppbXBvcnQgcmVnby52MQoKZGVmYXVsdCBhbGxvdyA6PSBmYWxzZQoKYWxsb3cgaWYgewogICAgaGFzX3pvbmVfYWNjZXNzCiAgICBhY3Rpb25faXNfbG9nX3ZpZXcKfQoKYWN0aW9uX2lzX2xvZ192aWV3IGlmIHsKICAgICJ2aWV3IiBpbiBpbnB1dC5hY3Rpb25zCn0KCmhhc196b25lX2FjY2VzcyBjb250YWlucyBhY2Nlc3NfZGF0YSBpZiB7CiAgICBzb21lIHpvbmVfZGF0YSBpbiBkYXRhLnpvbmUuem9uZS56b25lX2FjY2Vzc19sb2dzCiAgICB6b25lX2RhdGEudGltZXN0YW1wID49IGlucHV0LnRpbWVfcGVyaW9kLmZyb20KICAgIHpvbmVfZGF0YS50aW1lc3RhbXAgPCBpbnB1dC50aW1lX3BlcmlvZC50bwogICAgem9uZV9kYXRhLnpvbmVfaWQgPT0gaW5wdXQuem9uZV9pZAogICAgYWNjZXNzX2RhdGEgOj0ge2RhdGF0eXBlOiB6b25lX2RhdGFbZGF0YXR5cGVdIHwgZGF0YXR5cGUgaW4gaW5wdXQuZGF0YXR5cGVzfQp9Cg== + name: zone + version: 1.0.0 + metadata: + policy-id: zone + policy-version: 1.0.0 diff --git a/test/test_resources/undeploy_batch_delete.json b/test/test_resources/undeploy_batch_delete.json new file mode 100644 index 0000000..9195f74 --- /dev/null +++ b/test/test_resources/undeploy_batch_delete.json @@ -0,0 +1,23 @@ +{ + "groups": [ + { + "name": "opaGroup", + "deploymentSubgroups": [ + { + "pdpType": "opa", + "action": "DELETE", + "policies": [ + { + "name": "account", + "version": "1.0.0" + }, + { + "name": "organization", + "version": "1.0.0" + } + ] + } + ] + } + ] +} |