aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/kafkacomm
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/kafkacomm')
-rw-r--r--pkg/kafkacomm/handler/pdp_message_handler.go132
-rw-r--r--pkg/kafkacomm/handler/pdp_message_handler_test.go142
-rw-r--r--pkg/kafkacomm/handler/pdp_state_change_handler.go57
-rw-r--r--pkg/kafkacomm/handler/pdp_state_change_handler_test.go93
-rw-r--r--pkg/kafkacomm/handler/pdp_update_message_handler.go64
-rw-r--r--pkg/kafkacomm/handler/pdp_update_message_handler_test.go196
-rw-r--r--pkg/kafkacomm/mocks/kafkaconsumerinterface.go96
-rw-r--r--pkg/kafkacomm/mocks/kafkaproducerinterface.go51
-rw-r--r--pkg/kafkacomm/pdp_topic_consumer.go103
-rw-r--r--pkg/kafkacomm/pdp_topic_consumer_test.go129
-rw-r--r--pkg/kafkacomm/pdp_topic_producer.go107
-rw-r--r--pkg/kafkacomm/pdp_topic_producer_test.go117
-rw-r--r--pkg/kafkacomm/publisher/mocks/PdpStatusSender.go46
-rw-r--r--pkg/kafkacomm/publisher/pdp-heartbeat.go111
-rw-r--r--pkg/kafkacomm/publisher/pdp-heartbeat_test.go135
-rw-r--r--pkg/kafkacomm/publisher/pdp-pap-registration.go95
-rw-r--r--pkg/kafkacomm/publisher/pdp-pap-registration_test.go58
-rw-r--r--pkg/kafkacomm/publisher/pdp-status-publisher.go109
-rw-r--r--pkg/kafkacomm/publisher/pdp-status-publisher_test.go83
19 files changed, 1924 insertions, 0 deletions
diff --git a/pkg/kafkacomm/handler/pdp_message_handler.go b/pkg/kafkacomm/handler/pdp_message_handler.go
new file mode 100644
index 0000000..8d7da92
--- /dev/null
+++ b/pkg/kafkacomm/handler/pdp_message_handler.go
@@ -0,0 +1,132 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+
+// The handler package is responsible for processing messages from Kafka, specifically targeting the OPA
+// (Open Policy Agent) PDP (Policy Decision Point). It validates the message type,
+//
+// ensures it is relevant to the current PDP, and dispatches the message for appropriate processing.
+package handler
+
+import (
+ "encoding/json"
+ "policy-opa-pdp/consts"
+ "policy-opa-pdp/pkg/kafkacomm"
+ "policy-opa-pdp/pkg/kafkacomm/publisher"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/pdpattributes"
+)
+
+type OpaPdpMessage struct {
+ Name string `json:"name"` // Name of the PDP (optional for broadcast messages).
+ MessageType string `json:"MessageName"` // Type of the message (e.g., PDP_UPDATE, PDP_STATE_CHANGE, etc.)
+ PdpGroup string `json:"pdpGroup"` // Group to which the PDP belongs.
+ PdpSubgroup string `json:"pdpSubgroup"` // Subgroup within the PDP group.
+}
+
+// Checks if the incoming Kafka message belongs to the current PDP instance.
+func checkIfMessageIsForOpaPdp(message OpaPdpMessage) bool {
+
+ if message.Name != "" {
+ // message included a PDP name, check if matches
+ //log.Infof(" Message Name is not empty")
+ return message.Name == pdpattributes.PdpName
+ }
+
+ // message does not provide a PDP name - must be a broadcast
+ if message.PdpGroup == "" {
+ //log.Infof(" Message PDP Group is empty")
+ return false
+ }
+
+ if pdpattributes.PdpSubgroup == "" {
+ // this PDP has no assignment yet, thus should ignore broadcast messages
+ //log.Infof(" pdpstate PDP subgroup is empty")
+ return false
+ }
+
+ if message.PdpGroup != consts.PdpGroup {
+ //log.Infof(" message pdp group is not equal to cons pdp group")
+ return false
+ }
+
+ if message.PdpSubgroup == "" {
+ //message was broadcast to entire group
+ //log.Infof(" message pdp subgroup is empty")
+ return true
+ }
+
+ return message.PdpSubgroup == pdpattributes.PdpSubgroup
+}
+
+// Handles incoming Kafka messages, validates their relevance to the current PDP,
+// and dispatches them for further processing based on their type.
+func PdpMessageHandler(kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error {
+
+ log.Debug("Starting PDP Message Listener.....")
+ var stopConsuming bool
+ for !stopConsuming {
+ message, err := kafkacomm.ReadKafkaMessages(kc)
+ if err != nil {
+ log.Warnf("Failed to Read Kafka Messages: %v\n", err)
+ continue
+ }
+ log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message))
+
+ if message != nil {
+
+ var opaPdpMessage OpaPdpMessage
+
+ err = json.Unmarshal(message, &opaPdpMessage)
+ if err != nil {
+ log.Warnf("Failed to UnMarshal Messages: %v\n", err)
+ continue
+ }
+
+ if !checkIfMessageIsForOpaPdp(opaPdpMessage) {
+
+ log.Warnf("Not a valid Opa Pdp Message")
+ continue
+ }
+
+ switch opaPdpMessage.MessageType {
+
+ case "PDP_UPDATE":
+ err = PdpUpdateMessageHandler(message, p)
+ if err != nil {
+ log.Warnf("Error processing Update Message: %v", err)
+ }
+
+ case "PDP_STATE_CHANGE":
+ err = PdpStateChangeMessageHandler(message, p)
+ if err != nil {
+ log.Warnf("Error processing Update Message: %v", err)
+ }
+
+ case "PDP_STATUS":
+ log.Debugf("discarding event of type PDP_STATUS")
+ continue
+ default:
+ log.Errorf("This is not a valid Message Type: %s", opaPdpMessage.MessageType)
+ continue
+
+ }
+
+ }
+ }
+ return nil
+
+}
diff --git a/pkg/kafkacomm/handler/pdp_message_handler_test.go b/pkg/kafkacomm/handler/pdp_message_handler_test.go
new file mode 100644
index 0000000..3764c9e
--- /dev/null
+++ b/pkg/kafkacomm/handler/pdp_message_handler_test.go
@@ -0,0 +1,142 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package handler
+
+import (
+ "github.com/stretchr/testify/assert"
+ "policy-opa-pdp/pkg/pdpattributes"
+ "testing"
+)
+
+/*
+checkIfMessageIsForOpaPdp_Check
+Description: Validating Message Attributes
+Input: PDP message
+Expected Output: Returning true stating all the values are validated successfully
+*/
+func TestCheckIfMessageIsForOpaPdp_Check(t *testing.T) {
+
+ var opapdpMessage OpaPdpMessage
+
+ opapdpMessage.Name = "opa-3a318049-813f-4172-b4d3-7d4f466e5b80"
+ opapdpMessage.MessageType = "PDP_STATUS"
+ opapdpMessage.PdpGroup = "defaultGroup"
+ opapdpMessage.PdpSubgroup = "opa"
+
+ assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Its a valid Opa Pdp Message")
+
+}
+
+/*
+checkIfMessageIsForOpaPdp_Check_Message_Name
+Description: Validating Message Attributes
+Input: PDP message with name as empty
+Expected Output: Returning Error since it is not valid message
+*/
+func TestCheckIfMessageIsForOpaPdp_Check_Message_Name(t *testing.T) {
+
+ var opapdpMessage OpaPdpMessage
+
+ opapdpMessage.Name = ""
+ opapdpMessage.MessageType = "PDP_STATUS"
+ opapdpMessage.PdpGroup = "defaultGroup"
+ opapdpMessage.PdpSubgroup = "opa"
+
+ assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Not a valid Opa Pdp Message")
+
+}
+
+/*
+checkIfMessageIsForOpaPdp_Check_PdpGroup
+Description: Validating Message Attributes
+Input: PDP message with invalid PdpGroup
+Expected Output: Returning Error since it is not valid message
+*/
+func TestCheckIfMessageIsForOpaPdp_Check_PdpGroup(t *testing.T) {
+
+ var opapdpMessage OpaPdpMessage
+
+ opapdpMessage.Name = ""
+ opapdpMessage.MessageType = "PDP_STATUS"
+ opapdpMessage.PdpGroup = "defaultGroup"
+ opapdpMessage.PdpSubgroup = "opa"
+
+ pdpattributes.PdpSubgroup = "opa"
+ assert.True(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Its a valid Opa Pdp Message")
+
+}
+
+/*
+checkIfMessageIsForOpaPdp_Check_EmptyPdpGroup
+Description: Validating Message Attributes
+Input: PDP Group Empty
+Expected Output: Returning Error since it is not valid message
+*/
+func TestCheckIfMessageIsForOpaPdp_Check_EmptyPdpGroup(t *testing.T) {
+
+ var opapdpMessage OpaPdpMessage
+
+ opapdpMessage.Name = ""
+ opapdpMessage.MessageType = "PDP_STATUS"
+ opapdpMessage.PdpGroup = ""
+ opapdpMessage.PdpSubgroup = "opa"
+
+ assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Not a valid Opa Pdp Message")
+
+}
+
+/*
+checkIfMessageIsForOpaPdp_Check_PdpSubgroup
+Description: Validating Message Attributes
+Input: PDP message with invalid PdpSubgroup
+Expected Output: Returning Error since it is not valid message
+*/
+func TestCheckIfMessageIsForOpaPdp_Check_PdpSubgroup(t *testing.T) {
+
+ var opapdpMessage OpaPdpMessage
+
+ opapdpMessage.Name = ""
+ opapdpMessage.MessageType = "PDP_STATUS"
+ opapdpMessage.PdpGroup = "defaultGroup"
+ opapdpMessage.PdpSubgroup = "opa"
+
+ pdpattributes.PdpSubgroup = "opa"
+ assert.True(t, checkIfMessageIsForOpaPdp(opapdpMessage), "It's a valid Opa Pdp Message")
+
+}
+
+/*
+checkIfMessageIsForOpaPdp_Check_IncorrectPdpSubgroup
+Description: Validating Message Attributes
+Input: PDP message with empty PdpSubgroup
+Expected Output: Returning Error since it is not valid message
+*/
+func TestCheckIfMessageIsForOpaPdp_Check_IncorrectPdpSubgroup(t *testing.T) {
+
+ var opapdpMessage OpaPdpMessage
+
+ opapdpMessage.Name = ""
+ opapdpMessage.MessageType = "PDP_STATUS"
+ opapdpMessage.PdpGroup = "defaultGroup"
+ opapdpMessage.PdpSubgroup = "o"
+
+ pdpattributes.PdpSubgroup = "opa"
+ assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Not a valid Opa Pdp Message")
+
+}
diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler.go b/pkg/kafkacomm/handler/pdp_state_change_handler.go
new file mode 100644
index 0000000..32d998f
--- /dev/null
+++ b/pkg/kafkacomm/handler/pdp_state_change_handler.go
@@ -0,0 +1,57 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+
+// will process the state change message from pap and send the pdp status response.
+package handler
+
+import (
+ "encoding/json"
+ "policy-opa-pdp/pkg/kafkacomm/publisher"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/pdpstate"
+)
+
+// Processes incoming messages indicating a PDP state change.
+// This includes updating the PDP state and sending a status response when the state transitions.
+func PdpStateChangeMessageHandler(message []byte, p publisher.PdpStatusSender) error {
+
+ var pdpStateChange model.PdpStateChange
+
+ err := json.Unmarshal(message, &pdpStateChange)
+ if err != nil {
+ log.Debugf("Failed to UnMarshal Messages: %v\n", err)
+ return err
+ }
+
+ log.Debugf("PDP STATE CHANGE message received: %s", string(message))
+
+ if pdpStateChange.State != "" {
+ pdpstate.SetState(pdpStateChange.State)
+
+ }
+
+ log.Debugf("State change from PASSIVE To : %s", pdpstate.GetState())
+ err = publisher.SendStateChangeResponse(p, &pdpStateChange)
+ if err != nil {
+ log.Debugf("Failed to Send State Change Response Message: %v\n", err)
+ return err
+ }
+ log.Infof("PDP_STATUS With State Change Message Sent Successfully")
+
+ return nil
+}
diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go
new file mode 100644
index 0000000..f7e8f84
--- /dev/null
+++ b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go
@@ -0,0 +1,93 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package handler
+
+import (
+ "policy-opa-pdp/pkg/kafkacomm/publisher"
+ "policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/pdpstate"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+)
+
+// MockPdpStatusSender is a mock implementation of the PdpStatusSender interface
+type MockPdpStatusSender struct {
+ mock.Mock
+}
+
+func (m *MockPdpStatusSender) SendStateChangeResponse(p *publisher.PdpStatusSender, pdpStateChange *model.PdpStateChange) error {
+ args := m.Called(p, pdpStateChange)
+ return args.Error(0)
+}
+
+func (m *MockPdpStatusSender) SendPdpStatus(status model.PdpStatus) error {
+ args := m.Called(status)
+ return args.Error(0)
+}
+
+func TestPdpStateChangeMessageHandler(t *testing.T) {
+
+ // Create a mock PdpStatusSender
+ mockSender := new(MockPdpStatusSender)
+
+ // Define test cases
+ tests := []struct {
+ name string
+ message []byte
+ expectedState string
+ mockError error
+ expectError bool
+ }{
+ {
+ name: "Valid state change",
+ message: []byte(`{"state":"ACTIVE"}`),
+ expectedState: "ACTIVE",
+ mockError: nil,
+ expectError: false,
+ },
+ {
+ name: "Invalid JSON",
+ message: []byte(`{"state":}`),
+ mockError: nil,
+ expectError: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Set up the mock to return the expected error
+ mockSender.On("SendStateChangeResponse", mock.Anything, mock.Anything).Return(tt.mockError)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ // Call the handler
+ err := PdpStateChangeMessageHandler(tt.message, mockSender)
+
+ // Check the results
+ if tt.expectError {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ assert.Equal(t, tt.expectedState, pdpstate.GetState().String())
+ }
+
+ })
+ }
+}
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler.go b/pkg/kafkacomm/handler/pdp_update_message_handler.go
new file mode 100644
index 0000000..632bcc8
--- /dev/null
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler.go
@@ -0,0 +1,64 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+
+// will process the update message from pap and send the pdp status response.
+package handler
+
+import (
+ "encoding/json"
+ "github.com/go-playground/validator/v10"
+ "policy-opa-pdp/pkg/kafkacomm/publisher"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/pdpattributes"
+)
+
+// Handles messages of type PDP_UPDATE sent from the Policy Administration Point (PAP).
+// It validates the incoming data, updates PDP attributes, and sends a response back to the sender.
+func PdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error {
+
+ var pdpUpdate model.PdpUpdate
+ err := json.Unmarshal(message, &pdpUpdate)
+ if err != nil {
+ log.Debugf("Failed to UnMarshal Messages: %v\n", err)
+ return err
+ }
+ //Initialize Validator and validate Struct after unmarshalling
+ validate := validator.New()
+
+ err = validate.Struct(pdpUpdate)
+ if err != nil {
+ for _, err := range err.(validator.ValidationErrors) {
+ log.Infof("Field %s failed on the %s tag\n", err.Field(), err.Tag())
+ }
+ return err
+ }
+
+ log.Debugf("PDP_UPDATE Message received: %s", string(message))
+
+ pdpattributes.SetPdpSubgroup(pdpUpdate.PdpSubgroup)
+ pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs)
+
+ err = publisher.SendPdpUpdateResponse(p, &pdpUpdate)
+ if err != nil {
+ log.Debugf("Failed to Send Update Response Message: %v\n", err)
+ return err
+ }
+ log.Infof("PDP_STATUS Message Sent Successfully")
+ go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
+ return nil
+}
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
new file mode 100644
index 0000000..061f1ce
--- /dev/null
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
@@ -0,0 +1,196 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package handler
+
+import (
+ "errors"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
+ "testing"
+)
+
+/*
+PdpUpdateMessageHandler_success
+Description: Test by sending a valid input message for pdp update
+Input: valid input
+Expected Output: PDP Update Message should be sent sucessfully.
+*/
+func TestPdpUpdateMessageHandler_Success(t *testing.T) {
+
+ messageString := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed":[],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_UPDATE",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059",
+ "pdpGroup":"defaultGroup",
+ "pdpSubgroup":"opa"
+ }`
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ assert.NoError(t, err)
+
+}
+
+/*
+PdpUpdateMessageHandler_Message_Unmarshal_Failure1
+Description: Test by sending a invalid input message which should result in a Json unmarhsal error
+Input: invalid input Message by renaming params or removing certain params
+Expected Output: Message Handler should exit gracefully stating the error.
+*/
+func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure1(t *testing.T) {
+
+ // sending only source parameter in the message string
+ messageString := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0"}`
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
+
+ err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ assert.Error(t, err)
+
+}
+
+/*
+PdpUpdateMessageHandler_Message_Unmarshal_Failure2
+Description: Test by sending a invalid input message which should result in a Json unmarhsal error
+Input: invalid input Message by renaming params or removing certain params
+Expected Output: Message Handler should exit gracefully stating the error.
+*/
+func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure2(t *testing.T) {
+
+ // invlaid params by mispelling a param "source"
+
+ messageString := `{
+ "soce":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000}`
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
+
+ err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ assert.Error(t, err)
+
+}
+
+/*
+PdpUpdateMessageHandler_Message_Unmarshal_Failure3
+Description: Test by sending a invalid input message which should result in a Json unmarhsal error
+Input: {}
+Expected Output: Message Handler should exit gracefully stating the error.
+*/
+func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure3(t *testing.T) {
+
+ // invlaid params by mispelling a param "source"
+
+ messageString := `{
+ "soce:"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000}`
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
+
+ err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ assert.Error(t, err)
+
+}
+
+/*
+PdpUpdateMessageHandler_Message_Unmarshal_Failure4
+Description: Test by sending a invalid input message which should result in a Json unmarhsal error
+Input: empty
+Expected Output: Message Handler should exit gracefully stating the error.
+*/
+func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure4(t *testing.T) {
+
+ // invlaid params by mispelling a param "source"
+
+ messageString := `""`
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
+
+ err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ assert.Error(t, err)
+
+}
+
+/*
+PdpUpdateMessageHandler_Fails_Sending_PdpUpdateResponse
+Description: Test by sending a invalid attribute for pdpstate which should result in a failure in sending pdp update response
+Input: invalid input config set for pdpstate
+Expected Output: Message Handler should exit gracefully stating the error.
+*/
+func TestPdpUpdateMessageHandler_Fails_Sending_UpdateResponse(t *testing.T) {
+
+ // invalid value set to pdpSubgroup -->empty ""
+ messageString := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed":[],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_UPDATE",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059",
+ "pdpGroup":"defaultGroup"
+ }`
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Error in Sending PDP Update Response"))
+
+ err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ assert.Error(t, err)
+
+}
+
+/*
+PdpUpdateMessageHandler_Invalid_Starttimeinterval
+Description: Test by sending a invalid time value attribute for pdpstate which should result in a failure in starting heartbeat interval
+Input: invalid input message for pdpstate heartbeat interval
+Expected Output: Message Handler should exit gracefully stating the error.
+*/
+func TestPdpUpdateMessageHandler_Invalid_Starttimeinterval(t *testing.T) {
+
+ //invalid interval set to negative -1000
+ messageString := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":-1000,
+ "policiesToBeDeployed":[],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_UPDATE",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059",
+ "pdpGroup":"defaultGroup",
+ "pdpSubgroup":"opa"
+ }`
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Invalid Interval Time for Heartbeat"))
+
+ err := PdpUpdateMessageHandler([]byte(messageString), mockSender)
+ assert.Error(t, err)
+
+}
diff --git a/pkg/kafkacomm/mocks/kafkaconsumerinterface.go b/pkg/kafkacomm/mocks/kafkaconsumerinterface.go
new file mode 100644
index 0000000..ca5140e
--- /dev/null
+++ b/pkg/kafkacomm/mocks/kafkaconsumerinterface.go
@@ -0,0 +1,96 @@
+// Code generated by mockery v2.46.3. DO NOT EDIT.
+
+package mocks
+
+import (
+ kafka "github.com/confluentinc/confluent-kafka-go/kafka"
+
+ mock "github.com/stretchr/testify/mock"
+
+ time "time"
+)
+
+// KafkaConsumerInterface is an autogenerated mock type for the KafkaConsumerInterface type
+type KafkaConsumerInterface struct {
+ mock.Mock
+}
+
+// Close provides a mock function with given fields:
+func (_m *KafkaConsumerInterface) Close() error {
+ ret := _m.Called()
+
+ if len(ret) == 0 {
+ panic("no return value specified for Close")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func() error); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// ReadMessage provides a mock function with given fields: timeout
+func (_m *KafkaConsumerInterface) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
+ ret := _m.Called(timeout)
+
+ if len(ret) == 0 {
+ panic("no return value specified for ReadMessage")
+ }
+
+ var r0 *kafka.Message
+ var r1 error
+ if rf, ok := ret.Get(0).(func(time.Duration) (*kafka.Message, error)); ok {
+ return rf(timeout)
+ }
+ if rf, ok := ret.Get(0).(func(time.Duration) *kafka.Message); ok {
+ r0 = rf(timeout)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*kafka.Message)
+ }
+ }
+
+ if rf, ok := ret.Get(1).(func(time.Duration) error); ok {
+ r1 = rf(timeout)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// Unsubscribe provides a mock function with given fields:
+func (_m *KafkaConsumerInterface) Unsubscribe() error {
+ ret := _m.Called()
+
+ if len(ret) == 0 {
+ panic("no return value specified for Unsubscribe")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func() error); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// NewKafkaConsumerInterface creates a new instance of KafkaConsumerInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewKafkaConsumerInterface(t interface {
+ mock.TestingT
+ Cleanup(func())
+}) *KafkaConsumerInterface {
+ mock := &KafkaConsumerInterface{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/pkg/kafkacomm/mocks/kafkaproducerinterface.go b/pkg/kafkacomm/mocks/kafkaproducerinterface.go
new file mode 100644
index 0000000..97b6f53
--- /dev/null
+++ b/pkg/kafkacomm/mocks/kafkaproducerinterface.go
@@ -0,0 +1,51 @@
+// Code generated by mockery v2.46.3. DO NOT EDIT.
+
+package mocks
+
+import (
+ kafka "github.com/confluentinc/confluent-kafka-go/kafka"
+
+ mock "github.com/stretchr/testify/mock"
+)
+
+// KafkaProducerInterface is an autogenerated mock type for the KafkaProducerInterface type
+type KafkaProducerInterface struct {
+ mock.Mock
+}
+
+// Close provides a mock function with given fields:
+func (_m *KafkaProducerInterface) Close() {
+ _m.Called()
+}
+
+// Produce provides a mock function with given fields: _a0, _a1
+func (_m *KafkaProducerInterface) Produce(_a0 *kafka.Message, _a1 chan kafka.Event) error {
+ ret := _m.Called(_a0, _a1)
+
+ if len(ret) == 0 {
+ panic("no return value specified for Produce")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(*kafka.Message, chan kafka.Event) error); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// NewKafkaProducerInterface creates a new instance of KafkaProducerInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewKafkaProducerInterface(t interface {
+ mock.TestingT
+ Cleanup(func())
+}) *KafkaProducerInterface {
+ mock := &KafkaProducerInterface{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/pkg/kafkacomm/pdp_topic_consumer.go b/pkg/kafkacomm/pdp_topic_consumer.go
new file mode 100644
index 0000000..4858bdf
--- /dev/null
+++ b/pkg/kafkacomm/pdp_topic_consumer.go
@@ -0,0 +1,103 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+
+// kafkacomm package provides a structured way to create and manage Kafka consumers,
+// handle subscriptions, and read messages from Kafka topics
+package kafkacomm
+
+import (
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ "policy-opa-pdp/cfg"
+ "policy-opa-pdp/pkg/log"
+ "time"
+)
+
+// KafkaConsumerInterface defines the interface for a Kafka consumer.
+type KafkaConsumerInterface interface {
+ Close() error
+ Unsubscribe() error
+ ReadMessage(timeout time.Duration) (*kafka.Message, error)
+}
+
+// KafkaConsumer is a wrapper around the Kafka consumer.
+type KafkaConsumer struct {
+ Consumer KafkaConsumerInterface
+}
+
+// Close closes the KafkaConsumer
+func (kc *KafkaConsumer) Close() {
+ kc.Consumer.Close()
+}
+
+// Unsubscribe unsubscribes the KafkaConsumer
+func (kc *KafkaConsumer) Unsubscribe() error {
+ if err := kc.Consumer.Unsubscribe(); err != nil {
+ log.Warnf("Error Unsubscribing :%v", err)
+ return err
+ }
+ log.Debug("Unsubscribe From Topic")
+ return nil
+}
+
+// creates a new Kafka consumer and returns it
+func NewKafkaConsumer() (*KafkaConsumer, error) {
+ brokers := cfg.BootstrapServer
+ groupid := cfg.GroupId
+ topic := cfg.Topic
+ useSASL := cfg.UseSASLForKAFKA
+ username := cfg.KAFKA_USERNAME
+ password := cfg.KAFKA_PASSWORD
+
+ // Add Kafka Connection Properties ....
+ configMap := &kafka.ConfigMap{
+ "bootstrap.servers": brokers,
+ "group.id": groupid,
+ "auto.offset.reset": "earliest",
+ }
+ //for STRIMZI-KAFKA in case sasl is enabled
+ if useSASL == "true" {
+ configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512")
+ configMap.SetKey("sasl.username", username)
+ configMap.SetKey("sasl.password", password)
+ configMap.SetKey("security.protocol", "SASL_PLAINTEXT")
+ }
+
+ // create new Kafka Consumer
+ consumer, err := kafka.NewConsumer(configMap)
+ if err != nil {
+ log.Warnf("Error creating consumer: %v\n", err)
+ return nil, err
+ }
+ //subscribe to topic
+ err = consumer.SubscribeTopics([]string{topic}, nil)
+ if err != nil {
+ log.Warnf("Error subcribing to topic: %v\n", err)
+ return nil, err
+ }
+ log.Debugf("Topic Subscribed... : %v", topic)
+ return &KafkaConsumer{Consumer: consumer}, nil
+}
+
+// gets the Kafka messages on the subscribed topic
+func ReadKafkaMessages(kc *KafkaConsumer) ([]byte, error) {
+ msg, err := kc.Consumer.ReadMessage(-1)
+ if err != nil {
+ log.Warnf("Error reading Kafka message: %v", err)
+ return nil, err
+ }
+ return msg.Value, nil
+}
diff --git a/pkg/kafkacomm/pdp_topic_consumer_test.go b/pkg/kafkacomm/pdp_topic_consumer_test.go
new file mode 100644
index 0000000..2fdfa90
--- /dev/null
+++ b/pkg/kafkacomm/pdp_topic_consumer_test.go
@@ -0,0 +1,129 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package kafkacomm
+
+import (
+ "errors"
+ "policy-opa-pdp/pkg/kafkacomm/mocks"
+ "testing"
+
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+)
+
+func TestNewKafkaConsumer(t *testing.T) {
+ // Assuming configuration is correctly loaded from cfg package
+ // You can mock or override cfg values here if needed
+
+ consumer, err := NewKafkaConsumer()
+ assert.NoError(t, err, "Expected no error when creating Kafka consumer")
+ assert.NotNil(t, consumer, "Expected a non-nil KafkaConsumer")
+
+ // Clean up
+ if consumer != nil {
+ consumer.Close()
+ }
+}
+
+func TestReadKafkaMessages_Success(t *testing.T) {
+ // Create a new mock for ConsumerInterface
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+
+ // Create a KafkaConsumer with the mock
+ kc := &KafkaConsumer{Consumer: mockConsumer}
+
+ // Define the expected message
+ expectedMsg := &kafka.Message{Value: []byte("test message")}
+
+ // Set up the mock to return the expected message
+ mockConsumer.On("ReadMessage", mock.Anything).Return(expectedMsg, nil)
+
+ // Test ReadKafkaMessages
+ msg, err := ReadKafkaMessages(kc)
+ assert.NoError(t, err, "Expected no error when reading message")
+ assert.Equal(t, expectedMsg.Value, msg, "Expected message content to match")
+
+ // Assert expectations
+ mockConsumer.AssertExpectations(t)
+}
+
+func TestReadKafkaMessages_Error(t *testing.T) {
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+
+ kc := &KafkaConsumer{Consumer: mockConsumer}
+
+ // Set up the mock to return an error
+ expectedErr := errors.New("read error")
+ mockConsumer.On("ReadMessage", mock.Anything).Return(nil, expectedErr)
+
+ msg, err := ReadKafkaMessages(kc)
+ assert.Error(t, err, "Expected an error when reading message")
+ assert.Nil(t, msg, "Expected message to be nil on error")
+
+ mockConsumer.AssertExpectations(t)
+}
+
+func TestKafkaConsumer_Close(t *testing.T) {
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+
+ kc := &KafkaConsumer{Consumer: mockConsumer}
+
+ // Set up the mock for Close
+ mockConsumer.On("Close").Return(nil)
+
+ // Test Close method
+ kc.Close()
+
+ // Verify that Close was called
+ mockConsumer.AssertExpectations(t)
+}
+
+func TestKafkaConsumer_Unsubscribe(t *testing.T) {
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+
+ kc := &KafkaConsumer{Consumer: mockConsumer}
+
+ // Set up the mock for Unsubscribe
+ mockConsumer.On("Unsubscribe").Return(nil)
+
+ // Test Unsubscribe method
+ err := kc.Unsubscribe()
+ assert.NoError(t, err)
+
+ // Verify that Unsubscribe was called
+ mockConsumer.AssertExpectations(t)
+}
+
+func TestKafkaConsumer_Unsubscribe_Error(t *testing.T) {
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockError := errors.New("Unsubscribe error")
+ kc := &KafkaConsumer{Consumer: mockConsumer}
+
+ // Set up the mock for Unsubscribe
+ mockConsumer.On("Unsubscribe").Return(mockError)
+
+ // Test Unsubscribe method
+ err := kc.Unsubscribe()
+ assert.Error(t, err)
+ assert.Equal(t, mockError, err)
+
+ // Verify that Unsubscribe was called
+ mockConsumer.AssertExpectations(t)
+}
diff --git a/pkg/kafkacomm/pdp_topic_producer.go b/pkg/kafkacomm/pdp_topic_producer.go
new file mode 100644
index 0000000..1b11b35
--- /dev/null
+++ b/pkg/kafkacomm/pdp_topic_producer.go
@@ -0,0 +1,107 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+
+// Package kafkacomm provides utilities for producing messages to a Kafka topic
+// using a configurable Kafka producer. It supports SASL authentication and
+// dynamic topic configuration.
+package kafkacomm
+
+import (
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ "policy-opa-pdp/cfg"
+ "sync"
+ "log"
+)
+
+type KafkaProducerInterface interface {
+ Produce(*kafka.Message, chan kafka.Event) error
+ Close()
+}
+
+// KafkaProducer wraps a Kafka producer instance and a topic to provide
+// a simple interface for producing messages.
+type KafkaProducer struct {
+ producer KafkaProducerInterface
+ topic string
+}
+
+var (
+ instance *KafkaProducer
+ once sync.Once
+)
+
+// GetKafkaProducer initializes and returns a KafkaProducer instance which is a singleton.
+// It configures the Kafka producer with the given bootstrap servers and topic.
+// If SASL authentication is enabled via the configuration, the necessary credentials
+// are set in the producer configuration.
+func GetKafkaProducer(bootstrapServers, topic string) (*KafkaProducer, error) {
+ var err error
+ once.Do(func() {
+ brokers := cfg.BootstrapServer
+ useSASL := cfg.UseSASLForKAFKA
+ username := cfg.KAFKA_USERNAME
+ password := cfg.KAFKA_PASSWORD
+
+ // Add Kafka Connection Properties ....
+ configMap := &kafka.ConfigMap{
+ "bootstrap.servers": brokers,
+ }
+
+ if useSASL == "true" {
+ configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512")
+ configMap.SetKey("sasl.username", username)
+ configMap.SetKey("sasl.password", password)
+ configMap.SetKey("security.protocol", "SASL_PLAINTEXT")
+ }
+
+ p, err := kafka.NewProducer(configMap)
+ if err != nil {
+ return
+ }
+ instance = &KafkaProducer{
+ producer: p,
+ topic: topic,
+ }
+
+ })
+ return instance, err
+}
+
+// Produce sends a message to the configured Kafka topic.
+// It takes the message payload as a byte slice and returns any errors
+func (kp *KafkaProducer) Produce(message []byte) error {
+ kafkaMessage := &kafka.Message{
+ TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: kafka.PartitionAny},
+ Value: []byte(message),
+ }
+ err := kp.producer.Produce(kafkaMessage, nil)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// Close shuts down the Kafka producer, releasing all resources.
+func (kp *KafkaProducer) Close() {
+
+ if kp == nil || kp.producer == nil {
+ log.Println("KafkaProducer or producer is nil, skipping Close.")
+ return
+ }
+ kp.producer.Close()
+ log.Println("KafkaProducer closed successfully.")
+}
diff --git a/pkg/kafkacomm/pdp_topic_producer_test.go b/pkg/kafkacomm/pdp_topic_producer_test.go
new file mode 100644
index 0000000..55f3bc8
--- /dev/null
+++ b/pkg/kafkacomm/pdp_topic_producer_test.go
@@ -0,0 +1,117 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package kafkacomm
+
+import (
+ "errors"
+ "testing"
+ "time"
+ // "github.com/confluentinc/confluent-kafka-go/kafka"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+
+ "policy-opa-pdp/pkg/kafkacomm/mocks" // Adjust to your actual mock path
+)
+
+func TestKafkaProducer_Produce_Success(t *testing.T) {
+ done := make(chan struct{})
+
+ go func() {
+ defer close(done)
+ // Arrange
+ mockProducer := new(mocks.KafkaProducerInterface)
+ topic := "test-topic"
+ kp := &KafkaProducer{
+ producer: mockProducer,
+ topic: topic,
+ }
+
+ message := []byte("test message")
+
+ // Mock Produce method to simulate successful delivery
+ mockProducer.On("Produce", mock.Anything, mock.Anything).Return(nil)
+
+ // Act
+ err := kp.Produce(message)
+
+ assert.NoError(t, err)
+ mockProducer.AssertExpectations(t)
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ t.Fatal("test timed out")
+ }
+
+}
+
+func TestKafkaProducer_Produce_Error(t *testing.T) {
+ // Arrange
+ mockProducer := new(mocks.KafkaProducerInterface)
+ topic := "test-topic"
+ kp := &KafkaProducer{
+ producer: mockProducer,
+ topic: topic,
+ }
+
+ // Simulate production error
+ mockProducer.On("Produce", mock.Anything, mock.Anything).Return(errors.New("produce error"))
+
+ // Act
+ err := kp.Produce([]byte("test message"))
+
+ // Assert
+ assert.Error(t, err)
+ assert.Equal(t, "produce error", err.Error())
+ mockProducer.AssertExpectations(t)
+}
+
+func TestKafkaProducer_Close(t *testing.T) {
+ // Arrange
+ mockProducer := new(mocks.KafkaProducerInterface)
+ kp := &KafkaProducer{
+ producer: mockProducer,
+ }
+
+ // Simulate successful close
+ mockProducer.On("Close").Return()
+
+ // Act
+ kp.Close()
+
+ // Assert
+ mockProducer.AssertExpectations(t)
+}
+
+func TestKafkaProducer_Close_Error(t *testing.T) {
+ // Arrange
+ mockProducer := new(mocks.KafkaProducerInterface)
+ kp := &KafkaProducer{
+ producer: mockProducer,
+ }
+
+ // Simulate close error
+ mockProducer.On("Close").Return()
+
+ // Act
+ kp.Close()
+
+ // Assert
+ mockProducer.AssertExpectations(t)
+}
diff --git a/pkg/kafkacomm/publisher/mocks/PdpStatusSender.go b/pkg/kafkacomm/publisher/mocks/PdpStatusSender.go
new file mode 100644
index 0000000..f9cc279
--- /dev/null
+++ b/pkg/kafkacomm/publisher/mocks/PdpStatusSender.go
@@ -0,0 +1,46 @@
+// Code generated by mockery v2.46.3. DO NOT EDIT.
+
+package mocks
+
+import (
+ model "policy-opa-pdp/pkg/model"
+
+ mock "github.com/stretchr/testify/mock"
+)
+
+// PdpStatusSender is an autogenerated mock type for the PdpStatusSender type
+type PdpStatusSender struct {
+ mock.Mock
+}
+
+// SendPdpStatus provides a mock function with given fields: pdpStatus
+func (_m *PdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error {
+ ret := _m.Called(pdpStatus)
+
+ if len(ret) == 0 {
+ panic("no return value specified for SendPdpStatus")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(model.PdpStatus) error); ok {
+ r0 = rf(pdpStatus)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// NewPdpStatusSender creates a new instance of PdpStatusSender. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewPdpStatusSender(t interface {
+ mock.TestingT
+ Cleanup(func())
+}) *PdpStatusSender {
+ mock := &PdpStatusSender{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat.go b/pkg/kafkacomm/publisher/pdp-heartbeat.go
new file mode 100644
index 0000000..f814992
--- /dev/null
+++ b/pkg/kafkacomm/publisher/pdp-heartbeat.go
@@ -0,0 +1,111 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+
+// The publisher package is responsible for managing periodic heartbeat messages for the
+// Open Policy Agent (OPA) Policy Decision Point (PDP) and publishing the PDP's status to relevant channels.
+// It provides functions to initialize, manage, and stop timers for sending heartbeat messages,
+// ensuring the PDP communicates its health and state periodically.
+package publisher
+
+import (
+ "fmt"
+ "policy-opa-pdp/consts"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/pdpattributes"
+ "policy-opa-pdp/pkg/pdpstate"
+ "time"
+
+ "github.com/google/uuid"
+)
+
+var (
+ ticker *time.Ticker
+ stopChan chan bool
+ currentInterval int64
+)
+
+// Initializes a timer that sends periodic heartbeat messages to indicate the health and state of the PDP.
+func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) {
+ if intervalMs <= 0 {
+ log.Errorf("Invalid interval provided: %d. Interval must be greater than zero.", intervalMs)
+ ticker = nil
+ return
+ }
+
+ if ticker != nil && intervalMs == currentInterval {
+ log.Debug("Ticker is already running")
+ return
+ }
+
+ if ticker != nil {
+ ticker.Stop()
+ }
+ // StopTicker()
+ currentInterval = intervalMs
+
+ ticker = time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
+ log.Debugf("New Ticker %d", currentInterval)
+ stopChan = make(chan bool)
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ sendPDPHeartBeat(s)
+ case <-stopChan:
+ ticker.Stop()
+ return
+ }
+ }
+ }()
+}
+
+// Creates and sends a heartbeat message with the PDP's current state, health, and attributes
+func sendPDPHeartBeat(s PdpStatusSender) error {
+ pdpStatus := model.PdpStatus{
+ MessageType: model.PDP_STATUS,
+ PdpType: consts.PdpType,
+ State: pdpstate.GetState(),
+ Healthy: model.Healthy,
+ Name: pdpattributes.PdpName,
+ Description: "Pdp heartbeat",
+ PdpGroup: consts.PdpGroup,
+ PdpSubgroup: &pdpattributes.PdpSubgroup,
+ }
+ pdpStatus.RequestID = uuid.New().String()
+ pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli())
+
+ err := s.SendPdpStatus(pdpStatus)
+ log.Debugf("Sending Heartbeat ...")
+ if err != nil {
+ log.Warnf("Error producing message: %v\n", err)
+ return err
+ } else {
+ return nil
+ }
+}
+
+// Stops the running ticker and terminates the goroutine managing heartbeat messages.
+func StopTicker() {
+ if ticker != nil && stopChan != nil {
+ stopChan <- true
+ close(stopChan)
+ ticker = nil
+ } else {
+ log.Debugf("Ticker is not Running")
+ }
+}
diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
new file mode 100644
index 0000000..f03b0eb
--- /dev/null
+++ b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
@@ -0,0 +1,135 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package publisher
+
+import (
+ /* "fmt"
+ "policy-opa-pdp/cfg"
+ "policy-opa-pdp/consts"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/pdpstate"*/
+ "errors"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
+ "testing"
+ // "time"
+ /* "github.com/google/uuid"*/)
+
+var (
+// ticker *time.Ticker
+// stopChan chan bool
+// currentInterval int64
+)
+
+/*
+Success Case 1
+TestStartHeartbeatIntervalTimer_ValidInterval
+Description: Test starting the heartbeat interval timer with a valid interval.
+Input: intervalMs = 1000
+Expected Output: The ticker starts with an interval of 1000 milliseconds, and heartbeat messages are sent at this interval.
+*/
+func TestStartHeartbeatIntervalTimer_ValidInterval(t *testing.T) {
+
+ intervalMs := int64(1000)
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ StartHeartbeatIntervalTimer(intervalMs, mockSender)
+ if ticker == nil {
+ t.Errorf("Expected ticker to be initialized")
+ }
+ if currentInterval != intervalMs {
+ t.Errorf("Expected currentInterval to be %d, got %d", intervalMs, currentInterval)
+ }
+}
+
+/*
+Failure Case 1
+TestStartHeartbeatIntervalTimer_InvalidInterval
+Description: Test starting the heartbeat interval timer with an invalid interval.
+Input: intervalMs = -1000
+Expected Output: The function should handle the invalid interval gracefully, possibly by logging an error message and not starting the ticker.
+*/
+func TestStartHeartbeatIntervalTimer_InvalidInterval(t *testing.T) {
+ intervalMs := int64(-1000)
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ StartHeartbeatIntervalTimer(intervalMs, mockSender)
+
+ if ticker != nil {
+ t.Log("Expected ticker to be nil for invalid interval")
+ }
+}
+
+/*
+TestSendPDPHeartBeat_Success 2
+Description: Test sending a heartbeat successfully.
+Input: Valid pdpStatus object
+Expected Output: Heartbeat message is sent successfully, and a debug log "Message sent successfully" is generated.
+*/
+func TestSendPDPHeartBeat_Success(t *testing.T) {
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+ err := sendPDPHeartBeat(mockSender)
+ assert.NoError(t, err)
+}
+
+/*
+TestSendPDPHeartBeat_Failure 2
+Description: Test failing to send a heartbeat.
+Input: Invalid pdpStatus object or network failure
+Expected Output: An error occurs while sending the heartbeat, and a warning log "Error producing message: ..." is generated.
+*/
+func TestSendPDPHeartBeat_Failure(t *testing.T) {
+ // Mock SendPdpStatus to return an error
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Error producing message"))
+ err := sendPDPHeartBeat(mockSender)
+ assert.Error(t, err)
+}
+
+/*
+TestStopTicker_Success 3
+Description: Test stopping the ticker.
+Input: Ticker is running
+Expected Output: The ticker stops, and the stop channel is closed.
+*/
+func TestStopTicker_Success(t *testing.T) {
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+ StartHeartbeatIntervalTimer(1000, mockSender)
+ StopTicker()
+ if ticker != nil {
+ t.Errorf("Expected ticker to be nil")
+ }
+}
+
+/*
+TestStopTicker_NotRunning 3
+Description: Test stopping the ticker when it is not running.
+Input: Ticker is not running
+Expected Output: The function should handle this case gracefully, possibly by logging a debug message indicating that the ticker is not running.
+*/
+func TestStopTicker_NotRunning(t *testing.T) {
+ StopTicker()
+}
diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration.go b/pkg/kafkacomm/publisher/pdp-pap-registration.go
new file mode 100644
index 0000000..75f22d6
--- /dev/null
+++ b/pkg/kafkacomm/publisher/pdp-pap-registration.go
@@ -0,0 +1,95 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+
+// allows to send the pdp registartion message with unique transaction id and timestamp to topic
+package publisher
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/google/uuid"
+ "policy-opa-pdp/cfg"
+ "policy-opa-pdp/consts"
+ "policy-opa-pdp/pkg/kafkacomm"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/pdpattributes"
+ "time"
+)
+
+type PdpStatusSender interface {
+ SendPdpStatus(pdpStatus model.PdpStatus) error
+}
+
+type RealPdpStatusSender struct{}
+
+// Sends PdpSTatus Message type to KafkaTopic
+func (s *RealPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error {
+
+ var topic string
+ bootstrapServers := cfg.BootstrapServer
+ topic = cfg.Topic
+ pdpStatus.RequestID = uuid.New().String()
+ pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli())
+
+ jsonMessage, err := json.Marshal(pdpStatus)
+ if err != nil {
+ log.Warnf("failed to marshal PdpStatus to JSON: %v", err)
+ return err
+ }
+
+ producer, err := kafkacomm.GetKafkaProducer(bootstrapServers, topic)
+ if err != nil {
+ log.Warnf("Error creating Kafka producer: %v\n", err)
+ return err
+ }
+
+ err = producer.Produce(jsonMessage)
+ if err != nil {
+ log.Warnf("Error producing message: %v\n", err)
+ } else {
+ log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage))
+ }
+
+ return nil
+}
+
+// sends the registartion message to topic using SendPdpStatus(pdpStatus)
+func SendPdpPapRegistration(s PdpStatusSender) error {
+
+ var pdpStatus = model.PdpStatus{
+ MessageType: model.PDP_STATUS,
+ PdpType: consts.PdpType,
+ State: model.Passive,
+ Healthy: model.Healthy,
+ Policies: nil,
+ PdpResponse: nil,
+ Name: pdpattributes.PdpName,
+ Description: "Pdp Status Registration Message",
+ PdpGroup: consts.PdpGroup,
+ }
+
+ log.Debugf("Sending PDP PAP Registration Message")
+
+ err := s.SendPdpStatus(pdpStatus)
+ if err != nil {
+ log.Warnf("Error producing message: %v\n", err)
+ return err
+ }
+ return nil
+
+}
diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
new file mode 100644
index 0000000..03749de
--- /dev/null
+++ b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
@@ -0,0 +1,58 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package publisher
+
+import (
+ "errors"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
+ "policy-opa-pdp/pkg/model"
+ "testing"
+)
+
+type MockPdpStatusSender struct {
+ mock.Mock
+}
+
+func (m *MockPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error {
+ return m.Called(pdpStatus).Error(0)
+
+}
+
+func TestSendPdpPapRegistration_Success(t *testing.T) {
+ mockSender := new(mocks.PdpStatusSender)
+
+ mockSender.On("SendPdpStatus", mock.AnythingOfType("model.PdpStatus")).Return(nil)
+
+ err := SendPdpPapRegistration(mockSender)
+ assert.NoError(t, err)
+ mockSender.AssertCalled(t, "SendPdpStatus", mock.AnythingOfType("model.PdpStatus"))
+}
+
+func TestSendPdpPapRegistration_Failure(t *testing.T) {
+ mockSender := new(mocks.PdpStatusSender)
+
+ mockSender.On("SendPdpStatus", mock.AnythingOfType("model.PdpStatus")).Return(errors.New("failed To Send"))
+
+ err := SendPdpPapRegistration(mockSender)
+ assert.Error(t, err, "Expected an error for failure")
+ assert.EqualError(t, err, "failed To Send", "Error messages should match")
+ mockSender.AssertCalled(t, "SendPdpStatus", mock.AnythingOfType("model.PdpStatus"))
+}
diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher.go b/pkg/kafkacomm/publisher/pdp-status-publisher.go
new file mode 100644
index 0000000..756d0f2
--- /dev/null
+++ b/pkg/kafkacomm/publisher/pdp-status-publisher.go
@@ -0,0 +1,109 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+// responsible for sending PDP_STATUS messages in response to specific events
+// such as updates (PDP_UPDATE) or state changes (PDP_STATE_CHANGE). These responses provide details
+// about the current state, health, and attributes of the Policy Decision Point (PDP).
+package publisher
+
+import (
+ "fmt"
+ "policy-opa-pdp/consts"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/model"
+ "policy-opa-pdp/pkg/pdpattributes"
+ "policy-opa-pdp/pkg/pdpstate"
+ "time"
+
+ "github.com/google/uuid"
+)
+
+// Sends a PDP_STATUS message to indicate the successful processing of a PDP_UPDATE request
+// received from the Policy Administration Point (PAP).
+func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error {
+
+ responseStatus := model.Success
+ responseMessage := "PDP Update was Successful"
+
+ pdpStatus := model.PdpStatus{
+ MessageType: model.PDP_STATUS,
+ PdpType: consts.PdpType,
+ State: pdpstate.State,
+ Healthy: model.Healthy,
+ Name: pdpattributes.PdpName,
+ Description: "Pdp Status Response Message For Pdp Update",
+ PdpGroup: consts.PdpGroup,
+ PdpSubgroup: &pdpattributes.PdpSubgroup,
+ // Policies: [],
+ PdpResponse: &model.PdpResponseDetails{
+ ResponseTo: &pdpUpdate.RequestId,
+ ResponseStatus: &responseStatus,
+ ResponseMessage: &responseMessage,
+ },
+ }
+
+ pdpStatus.RequestID = uuid.New().String()
+ pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli())
+
+ log.Infof("Sending PDP Status With Update Response")
+
+ err := s.SendPdpStatus(pdpStatus)
+ if err != nil {
+ log.Warnf("Failed to send PDP Update Message : %v", err)
+ return err
+ }
+
+ return nil
+
+}
+
+// Sends a PDP_STATUS message to indicate a state change in the PDP (e.g., from PASSIVE to ACTIVE).
+func SendStateChangeResponse(s PdpStatusSender, pdpStateChange *model.PdpStateChange) error {
+
+ responseStatus := model.Success
+ responseMessage := "PDP State Changed From PASSIVE TO Active"
+ pdpStatus := model.PdpStatus{
+ MessageType: model.PDP_STATUS,
+ PdpType: consts.PdpType,
+ State: pdpstate.GetState(),
+ Healthy: model.Healthy,
+ Name: pdpattributes.PdpName,
+ Description: "Pdp Status Response Message to Pdp State Change",
+ PdpGroup: consts.PdpGroup,
+ PdpSubgroup: &pdpattributes.PdpSubgroup,
+ // Policies: [],
+ PdpResponse: &model.PdpResponseDetails{
+ ResponseTo: &pdpStateChange.RequestId,
+ ResponseStatus: &responseStatus,
+ ResponseMessage: &responseMessage,
+ },
+ }
+
+ pdpStatus.RequestID = uuid.New().String()
+ pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli())
+
+ log.Infof("Sending PDP Status With State Change response")
+
+ err := s.SendPdpStatus(pdpStatus)
+ if err != nil {
+ log.Warnf("Failed to send PDP Update Message : %v", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
new file mode 100644
index 0000000..5e02704
--- /dev/null
+++ b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
@@ -0,0 +1,83 @@
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2024: Deutsche Telecom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package publisher
+
+import (
+ "errors"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
+ "policy-opa-pdp/pkg/model"
+ "testing"
+)
+
+// TestSendPdpUpdateResponse_Success tests SendPdpUpdateResponse for a successful response
+func TestSendPdpUpdateResponse_Success(t *testing.T) {
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+ pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
+
+ err := SendPdpUpdateResponse(mockSender, pdpUpdate)
+ assert.NoError(t, err)
+ mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
+}
+
+// TestSendPdpUpdateResponse_Failure tests SendPdpUpdateResponse when SendPdpStatus fails
+func TestSendPdpUpdateResponse_Failure(t *testing.T) {
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("mock send error"))
+
+ pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"}
+
+ err := SendPdpUpdateResponse(mockSender, pdpUpdate)
+
+ assert.Error(t, err)
+
+ mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
+}
+
+// TestSendStateChangeResponse_Success tests SendStateChangeResponse for a successful state change response
+func TestSendStateChangeResponse_Success(t *testing.T) {
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ pdpStateChange := &model.PdpStateChange{RequestId: "test-state-change-id"}
+
+ err := SendStateChangeResponse(mockSender, pdpStateChange)
+
+ assert.NoError(t, err)
+ mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
+}
+
+// TestSendStateChangeResponse_Failure tests SendStateChangeResponse when SendPdpStatus fails
+func TestSendStateChangeResponse_Failure(t *testing.T) {
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("mock send error"))
+
+ pdpStateChange := &model.PdpStateChange{RequestId: "test-state-change-id"}
+
+ err := SendStateChangeResponse(mockSender, pdpStateChange)
+ assert.Error(t, err)
+ mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything)
+
+}