diff options
Diffstat (limited to 'pkg/kafkacomm')
19 files changed, 1924 insertions, 0 deletions
diff --git a/pkg/kafkacomm/handler/pdp_message_handler.go b/pkg/kafkacomm/handler/pdp_message_handler.go new file mode 100644 index 0000000..8d7da92 --- /dev/null +++ b/pkg/kafkacomm/handler/pdp_message_handler.go @@ -0,0 +1,132 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== + +// The handler package is responsible for processing messages from Kafka, specifically targeting the OPA +// (Open Policy Agent) PDP (Policy Decision Point). It validates the message type, +// +// ensures it is relevant to the current PDP, and dispatches the message for appropriate processing. +package handler + +import ( + "encoding/json" + "policy-opa-pdp/consts" + "policy-opa-pdp/pkg/kafkacomm" + "policy-opa-pdp/pkg/kafkacomm/publisher" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/pdpattributes" +) + +type OpaPdpMessage struct { + Name string `json:"name"` // Name of the PDP (optional for broadcast messages). + MessageType string `json:"MessageName"` // Type of the message (e.g., PDP_UPDATE, PDP_STATE_CHANGE, etc.) + PdpGroup string `json:"pdpGroup"` // Group to which the PDP belongs. + PdpSubgroup string `json:"pdpSubgroup"` // Subgroup within the PDP group. +} + +// Checks if the incoming Kafka message belongs to the current PDP instance. +func checkIfMessageIsForOpaPdp(message OpaPdpMessage) bool { + + if message.Name != "" { + // message included a PDP name, check if matches + //log.Infof(" Message Name is not empty") + return message.Name == pdpattributes.PdpName + } + + // message does not provide a PDP name - must be a broadcast + if message.PdpGroup == "" { + //log.Infof(" Message PDP Group is empty") + return false + } + + if pdpattributes.PdpSubgroup == "" { + // this PDP has no assignment yet, thus should ignore broadcast messages + //log.Infof(" pdpstate PDP subgroup is empty") + return false + } + + if message.PdpGroup != consts.PdpGroup { + //log.Infof(" message pdp group is not equal to cons pdp group") + return false + } + + if message.PdpSubgroup == "" { + //message was broadcast to entire group + //log.Infof(" message pdp subgroup is empty") + return true + } + + return message.PdpSubgroup == pdpattributes.PdpSubgroup +} + +// Handles incoming Kafka messages, validates their relevance to the current PDP, +// and dispatches them for further processing based on their type. +func PdpMessageHandler(kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error { + + log.Debug("Starting PDP Message Listener.....") + var stopConsuming bool + for !stopConsuming { + message, err := kafkacomm.ReadKafkaMessages(kc) + if err != nil { + log.Warnf("Failed to Read Kafka Messages: %v\n", err) + continue + } + log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message)) + + if message != nil { + + var opaPdpMessage OpaPdpMessage + + err = json.Unmarshal(message, &opaPdpMessage) + if err != nil { + log.Warnf("Failed to UnMarshal Messages: %v\n", err) + continue + } + + if !checkIfMessageIsForOpaPdp(opaPdpMessage) { + + log.Warnf("Not a valid Opa Pdp Message") + continue + } + + switch opaPdpMessage.MessageType { + + case "PDP_UPDATE": + err = PdpUpdateMessageHandler(message, p) + if err != nil { + log.Warnf("Error processing Update Message: %v", err) + } + + case "PDP_STATE_CHANGE": + err = PdpStateChangeMessageHandler(message, p) + if err != nil { + log.Warnf("Error processing Update Message: %v", err) + } + + case "PDP_STATUS": + log.Debugf("discarding event of type PDP_STATUS") + continue + default: + log.Errorf("This is not a valid Message Type: %s", opaPdpMessage.MessageType) + continue + + } + + } + } + return nil + +} diff --git a/pkg/kafkacomm/handler/pdp_message_handler_test.go b/pkg/kafkacomm/handler/pdp_message_handler_test.go new file mode 100644 index 0000000..3764c9e --- /dev/null +++ b/pkg/kafkacomm/handler/pdp_message_handler_test.go @@ -0,0 +1,142 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package handler + +import ( + "github.com/stretchr/testify/assert" + "policy-opa-pdp/pkg/pdpattributes" + "testing" +) + +/* +checkIfMessageIsForOpaPdp_Check +Description: Validating Message Attributes +Input: PDP message +Expected Output: Returning true stating all the values are validated successfully +*/ +func TestCheckIfMessageIsForOpaPdp_Check(t *testing.T) { + + var opapdpMessage OpaPdpMessage + + opapdpMessage.Name = "opa-3a318049-813f-4172-b4d3-7d4f466e5b80" + opapdpMessage.MessageType = "PDP_STATUS" + opapdpMessage.PdpGroup = "defaultGroup" + opapdpMessage.PdpSubgroup = "opa" + + assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Its a valid Opa Pdp Message") + +} + +/* +checkIfMessageIsForOpaPdp_Check_Message_Name +Description: Validating Message Attributes +Input: PDP message with name as empty +Expected Output: Returning Error since it is not valid message +*/ +func TestCheckIfMessageIsForOpaPdp_Check_Message_Name(t *testing.T) { + + var opapdpMessage OpaPdpMessage + + opapdpMessage.Name = "" + opapdpMessage.MessageType = "PDP_STATUS" + opapdpMessage.PdpGroup = "defaultGroup" + opapdpMessage.PdpSubgroup = "opa" + + assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Not a valid Opa Pdp Message") + +} + +/* +checkIfMessageIsForOpaPdp_Check_PdpGroup +Description: Validating Message Attributes +Input: PDP message with invalid PdpGroup +Expected Output: Returning Error since it is not valid message +*/ +func TestCheckIfMessageIsForOpaPdp_Check_PdpGroup(t *testing.T) { + + var opapdpMessage OpaPdpMessage + + opapdpMessage.Name = "" + opapdpMessage.MessageType = "PDP_STATUS" + opapdpMessage.PdpGroup = "defaultGroup" + opapdpMessage.PdpSubgroup = "opa" + + pdpattributes.PdpSubgroup = "opa" + assert.True(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Its a valid Opa Pdp Message") + +} + +/* +checkIfMessageIsForOpaPdp_Check_EmptyPdpGroup +Description: Validating Message Attributes +Input: PDP Group Empty +Expected Output: Returning Error since it is not valid message +*/ +func TestCheckIfMessageIsForOpaPdp_Check_EmptyPdpGroup(t *testing.T) { + + var opapdpMessage OpaPdpMessage + + opapdpMessage.Name = "" + opapdpMessage.MessageType = "PDP_STATUS" + opapdpMessage.PdpGroup = "" + opapdpMessage.PdpSubgroup = "opa" + + assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Not a valid Opa Pdp Message") + +} + +/* +checkIfMessageIsForOpaPdp_Check_PdpSubgroup +Description: Validating Message Attributes +Input: PDP message with invalid PdpSubgroup +Expected Output: Returning Error since it is not valid message +*/ +func TestCheckIfMessageIsForOpaPdp_Check_PdpSubgroup(t *testing.T) { + + var opapdpMessage OpaPdpMessage + + opapdpMessage.Name = "" + opapdpMessage.MessageType = "PDP_STATUS" + opapdpMessage.PdpGroup = "defaultGroup" + opapdpMessage.PdpSubgroup = "opa" + + pdpattributes.PdpSubgroup = "opa" + assert.True(t, checkIfMessageIsForOpaPdp(opapdpMessage), "It's a valid Opa Pdp Message") + +} + +/* +checkIfMessageIsForOpaPdp_Check_IncorrectPdpSubgroup +Description: Validating Message Attributes +Input: PDP message with empty PdpSubgroup +Expected Output: Returning Error since it is not valid message +*/ +func TestCheckIfMessageIsForOpaPdp_Check_IncorrectPdpSubgroup(t *testing.T) { + + var opapdpMessage OpaPdpMessage + + opapdpMessage.Name = "" + opapdpMessage.MessageType = "PDP_STATUS" + opapdpMessage.PdpGroup = "defaultGroup" + opapdpMessage.PdpSubgroup = "o" + + pdpattributes.PdpSubgroup = "opa" + assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Not a valid Opa Pdp Message") + +} diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler.go b/pkg/kafkacomm/handler/pdp_state_change_handler.go new file mode 100644 index 0000000..32d998f --- /dev/null +++ b/pkg/kafkacomm/handler/pdp_state_change_handler.go @@ -0,0 +1,57 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== + +// will process the state change message from pap and send the pdp status response. +package handler + +import ( + "encoding/json" + "policy-opa-pdp/pkg/kafkacomm/publisher" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/model" + "policy-opa-pdp/pkg/pdpstate" +) + +// Processes incoming messages indicating a PDP state change. +// This includes updating the PDP state and sending a status response when the state transitions. +func PdpStateChangeMessageHandler(message []byte, p publisher.PdpStatusSender) error { + + var pdpStateChange model.PdpStateChange + + err := json.Unmarshal(message, &pdpStateChange) + if err != nil { + log.Debugf("Failed to UnMarshal Messages: %v\n", err) + return err + } + + log.Debugf("PDP STATE CHANGE message received: %s", string(message)) + + if pdpStateChange.State != "" { + pdpstate.SetState(pdpStateChange.State) + + } + + log.Debugf("State change from PASSIVE To : %s", pdpstate.GetState()) + err = publisher.SendStateChangeResponse(p, &pdpStateChange) + if err != nil { + log.Debugf("Failed to Send State Change Response Message: %v\n", err) + return err + } + log.Infof("PDP_STATUS With State Change Message Sent Successfully") + + return nil +} diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go new file mode 100644 index 0000000..f7e8f84 --- /dev/null +++ b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go @@ -0,0 +1,93 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package handler + +import ( + "policy-opa-pdp/pkg/kafkacomm/publisher" + "policy-opa-pdp/pkg/model" + "policy-opa-pdp/pkg/pdpstate" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +// MockPdpStatusSender is a mock implementation of the PdpStatusSender interface +type MockPdpStatusSender struct { + mock.Mock +} + +func (m *MockPdpStatusSender) SendStateChangeResponse(p *publisher.PdpStatusSender, pdpStateChange *model.PdpStateChange) error { + args := m.Called(p, pdpStateChange) + return args.Error(0) +} + +func (m *MockPdpStatusSender) SendPdpStatus(status model.PdpStatus) error { + args := m.Called(status) + return args.Error(0) +} + +func TestPdpStateChangeMessageHandler(t *testing.T) { + + // Create a mock PdpStatusSender + mockSender := new(MockPdpStatusSender) + + // Define test cases + tests := []struct { + name string + message []byte + expectedState string + mockError error + expectError bool + }{ + { + name: "Valid state change", + message: []byte(`{"state":"ACTIVE"}`), + expectedState: "ACTIVE", + mockError: nil, + expectError: false, + }, + { + name: "Invalid JSON", + message: []byte(`{"state":}`), + mockError: nil, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set up the mock to return the expected error + mockSender.On("SendStateChangeResponse", mock.Anything, mock.Anything).Return(tt.mockError) + mockSender.On("SendPdpStatus", mock.Anything).Return(nil) + + // Call the handler + err := PdpStateChangeMessageHandler(tt.message, mockSender) + + // Check the results + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedState, pdpstate.GetState().String()) + } + + }) + } +} diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler.go b/pkg/kafkacomm/handler/pdp_update_message_handler.go new file mode 100644 index 0000000..632bcc8 --- /dev/null +++ b/pkg/kafkacomm/handler/pdp_update_message_handler.go @@ -0,0 +1,64 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== + +// will process the update message from pap and send the pdp status response. +package handler + +import ( + "encoding/json" + "github.com/go-playground/validator/v10" + "policy-opa-pdp/pkg/kafkacomm/publisher" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/model" + "policy-opa-pdp/pkg/pdpattributes" +) + +// Handles messages of type PDP_UPDATE sent from the Policy Administration Point (PAP). +// It validates the incoming data, updates PDP attributes, and sends a response back to the sender. +func PdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error { + + var pdpUpdate model.PdpUpdate + err := json.Unmarshal(message, &pdpUpdate) + if err != nil { + log.Debugf("Failed to UnMarshal Messages: %v\n", err) + return err + } + //Initialize Validator and validate Struct after unmarshalling + validate := validator.New() + + err = validate.Struct(pdpUpdate) + if err != nil { + for _, err := range err.(validator.ValidationErrors) { + log.Infof("Field %s failed on the %s tag\n", err.Field(), err.Tag()) + } + return err + } + + log.Debugf("PDP_UPDATE Message received: %s", string(message)) + + pdpattributes.SetPdpSubgroup(pdpUpdate.PdpSubgroup) + pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs) + + err = publisher.SendPdpUpdateResponse(p, &pdpUpdate) + if err != nil { + log.Debugf("Failed to Send Update Response Message: %v\n", err) + return err + } + log.Infof("PDP_STATUS Message Sent Successfully") + go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p) + return nil +} diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go new file mode 100644 index 0000000..061f1ce --- /dev/null +++ b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go @@ -0,0 +1,196 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package handler + +import ( + "errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "policy-opa-pdp/pkg/kafkacomm/publisher/mocks" + "testing" +) + +/* +PdpUpdateMessageHandler_success +Description: Test by sending a valid input message for pdp update +Input: valid input +Expected Output: PDP Update Message should be sent sucessfully. +*/ +func TestPdpUpdateMessageHandler_Success(t *testing.T) { + + messageString := `{ + "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0", + "pdpHeartbeatIntervalMs":120000, + "policiesToBeDeployed":[], + "policiesToBeUndeployed":[], + "messageName":"PDP_UPDATE", + "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1", + "timestampMs":1730722305297, + "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059", + "pdpGroup":"defaultGroup", + "pdpSubgroup":"opa" + }` + + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(nil) + + err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + assert.NoError(t, err) + +} + +/* +PdpUpdateMessageHandler_Message_Unmarshal_Failure1 +Description: Test by sending a invalid input message which should result in a Json unmarhsal error +Input: invalid input Message by renaming params or removing certain params +Expected Output: Message Handler should exit gracefully stating the error. +*/ +func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure1(t *testing.T) { + + // sending only source parameter in the message string + messageString := `{ + "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0"}` + + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error")) + + err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + assert.Error(t, err) + +} + +/* +PdpUpdateMessageHandler_Message_Unmarshal_Failure2 +Description: Test by sending a invalid input message which should result in a Json unmarhsal error +Input: invalid input Message by renaming params or removing certain params +Expected Output: Message Handler should exit gracefully stating the error. +*/ +func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure2(t *testing.T) { + + // invlaid params by mispelling a param "source" + + messageString := `{ + "soce":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0", + "pdpHeartbeatIntervalMs":120000}` + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error")) + + err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + assert.Error(t, err) + +} + +/* +PdpUpdateMessageHandler_Message_Unmarshal_Failure3 +Description: Test by sending a invalid input message which should result in a Json unmarhsal error +Input: {} +Expected Output: Message Handler should exit gracefully stating the error. +*/ +func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure3(t *testing.T) { + + // invlaid params by mispelling a param "source" + + messageString := `{ + "soce:"pap-c17b4dbc-3278-483a-ace9-98f3157245c0", + "pdpHeartbeatIntervalMs":120000}` + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error")) + + err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + assert.Error(t, err) + +} + +/* +PdpUpdateMessageHandler_Message_Unmarshal_Failure4 +Description: Test by sending a invalid input message which should result in a Json unmarhsal error +Input: empty +Expected Output: Message Handler should exit gracefully stating the error. +*/ +func TestPdpUpdateMessageHandler_Message_Unmarshal_Failure4(t *testing.T) { + + // invlaid params by mispelling a param "source" + + messageString := `""` + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error")) + + err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + assert.Error(t, err) + +} + +/* +PdpUpdateMessageHandler_Fails_Sending_PdpUpdateResponse +Description: Test by sending a invalid attribute for pdpstate which should result in a failure in sending pdp update response +Input: invalid input config set for pdpstate +Expected Output: Message Handler should exit gracefully stating the error. +*/ +func TestPdpUpdateMessageHandler_Fails_Sending_UpdateResponse(t *testing.T) { + + // invalid value set to pdpSubgroup -->empty "" + messageString := `{ + "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0", + "pdpHeartbeatIntervalMs":120000, + "policiesToBeDeployed":[], + "policiesToBeUndeployed":[], + "messageName":"PDP_UPDATE", + "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1", + "timestampMs":1730722305297, + "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059", + "pdpGroup":"defaultGroup" + }` + + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Error in Sending PDP Update Response")) + + err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + assert.Error(t, err) + +} + +/* +PdpUpdateMessageHandler_Invalid_Starttimeinterval +Description: Test by sending a invalid time value attribute for pdpstate which should result in a failure in starting heartbeat interval +Input: invalid input message for pdpstate heartbeat interval +Expected Output: Message Handler should exit gracefully stating the error. +*/ +func TestPdpUpdateMessageHandler_Invalid_Starttimeinterval(t *testing.T) { + + //invalid interval set to negative -1000 + messageString := `{ + "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0", + "pdpHeartbeatIntervalMs":-1000, + "policiesToBeDeployed":[], + "policiesToBeUndeployed":[], + "messageName":"PDP_UPDATE", + "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1", + "timestampMs":1730722305297, + "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059", + "pdpGroup":"defaultGroup", + "pdpSubgroup":"opa" + }` + + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Invalid Interval Time for Heartbeat")) + + err := PdpUpdateMessageHandler([]byte(messageString), mockSender) + assert.Error(t, err) + +} diff --git a/pkg/kafkacomm/mocks/kafkaconsumerinterface.go b/pkg/kafkacomm/mocks/kafkaconsumerinterface.go new file mode 100644 index 0000000..ca5140e --- /dev/null +++ b/pkg/kafkacomm/mocks/kafkaconsumerinterface.go @@ -0,0 +1,96 @@ +// Code generated by mockery v2.46.3. DO NOT EDIT. + +package mocks + +import ( + kafka "github.com/confluentinc/confluent-kafka-go/kafka" + + mock "github.com/stretchr/testify/mock" + + time "time" +) + +// KafkaConsumerInterface is an autogenerated mock type for the KafkaConsumerInterface type +type KafkaConsumerInterface struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *KafkaConsumerInterface) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ReadMessage provides a mock function with given fields: timeout +func (_m *KafkaConsumerInterface) ReadMessage(timeout time.Duration) (*kafka.Message, error) { + ret := _m.Called(timeout) + + if len(ret) == 0 { + panic("no return value specified for ReadMessage") + } + + var r0 *kafka.Message + var r1 error + if rf, ok := ret.Get(0).(func(time.Duration) (*kafka.Message, error)); ok { + return rf(timeout) + } + if rf, ok := ret.Get(0).(func(time.Duration) *kafka.Message); ok { + r0 = rf(timeout) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*kafka.Message) + } + } + + if rf, ok := ret.Get(1).(func(time.Duration) error); ok { + r1 = rf(timeout) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Unsubscribe provides a mock function with given fields: +func (_m *KafkaConsumerInterface) Unsubscribe() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Unsubscribe") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewKafkaConsumerInterface creates a new instance of KafkaConsumerInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewKafkaConsumerInterface(t interface { + mock.TestingT + Cleanup(func()) +}) *KafkaConsumerInterface { + mock := &KafkaConsumerInterface{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/kafkacomm/mocks/kafkaproducerinterface.go b/pkg/kafkacomm/mocks/kafkaproducerinterface.go new file mode 100644 index 0000000..97b6f53 --- /dev/null +++ b/pkg/kafkacomm/mocks/kafkaproducerinterface.go @@ -0,0 +1,51 @@ +// Code generated by mockery v2.46.3. DO NOT EDIT. + +package mocks + +import ( + kafka "github.com/confluentinc/confluent-kafka-go/kafka" + + mock "github.com/stretchr/testify/mock" +) + +// KafkaProducerInterface is an autogenerated mock type for the KafkaProducerInterface type +type KafkaProducerInterface struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *KafkaProducerInterface) Close() { + _m.Called() +} + +// Produce provides a mock function with given fields: _a0, _a1 +func (_m *KafkaProducerInterface) Produce(_a0 *kafka.Message, _a1 chan kafka.Event) error { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for Produce") + } + + var r0 error + if rf, ok := ret.Get(0).(func(*kafka.Message, chan kafka.Event) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewKafkaProducerInterface creates a new instance of KafkaProducerInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewKafkaProducerInterface(t interface { + mock.TestingT + Cleanup(func()) +}) *KafkaProducerInterface { + mock := &KafkaProducerInterface{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/kafkacomm/pdp_topic_consumer.go b/pkg/kafkacomm/pdp_topic_consumer.go new file mode 100644 index 0000000..4858bdf --- /dev/null +++ b/pkg/kafkacomm/pdp_topic_consumer.go @@ -0,0 +1,103 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== + +// kafkacomm package provides a structured way to create and manage Kafka consumers, +// handle subscriptions, and read messages from Kafka topics +package kafkacomm + +import ( + "github.com/confluentinc/confluent-kafka-go/kafka" + "policy-opa-pdp/cfg" + "policy-opa-pdp/pkg/log" + "time" +) + +// KafkaConsumerInterface defines the interface for a Kafka consumer. +type KafkaConsumerInterface interface { + Close() error + Unsubscribe() error + ReadMessage(timeout time.Duration) (*kafka.Message, error) +} + +// KafkaConsumer is a wrapper around the Kafka consumer. +type KafkaConsumer struct { + Consumer KafkaConsumerInterface +} + +// Close closes the KafkaConsumer +func (kc *KafkaConsumer) Close() { + kc.Consumer.Close() +} + +// Unsubscribe unsubscribes the KafkaConsumer +func (kc *KafkaConsumer) Unsubscribe() error { + if err := kc.Consumer.Unsubscribe(); err != nil { + log.Warnf("Error Unsubscribing :%v", err) + return err + } + log.Debug("Unsubscribe From Topic") + return nil +} + +// creates a new Kafka consumer and returns it +func NewKafkaConsumer() (*KafkaConsumer, error) { + brokers := cfg.BootstrapServer + groupid := cfg.GroupId + topic := cfg.Topic + useSASL := cfg.UseSASLForKAFKA + username := cfg.KAFKA_USERNAME + password := cfg.KAFKA_PASSWORD + + // Add Kafka Connection Properties .... + configMap := &kafka.ConfigMap{ + "bootstrap.servers": brokers, + "group.id": groupid, + "auto.offset.reset": "earliest", + } + //for STRIMZI-KAFKA in case sasl is enabled + if useSASL == "true" { + configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") + configMap.SetKey("sasl.username", username) + configMap.SetKey("sasl.password", password) + configMap.SetKey("security.protocol", "SASL_PLAINTEXT") + } + + // create new Kafka Consumer + consumer, err := kafka.NewConsumer(configMap) + if err != nil { + log.Warnf("Error creating consumer: %v\n", err) + return nil, err + } + //subscribe to topic + err = consumer.SubscribeTopics([]string{topic}, nil) + if err != nil { + log.Warnf("Error subcribing to topic: %v\n", err) + return nil, err + } + log.Debugf("Topic Subscribed... : %v", topic) + return &KafkaConsumer{Consumer: consumer}, nil +} + +// gets the Kafka messages on the subscribed topic +func ReadKafkaMessages(kc *KafkaConsumer) ([]byte, error) { + msg, err := kc.Consumer.ReadMessage(-1) + if err != nil { + log.Warnf("Error reading Kafka message: %v", err) + return nil, err + } + return msg.Value, nil +} diff --git a/pkg/kafkacomm/pdp_topic_consumer_test.go b/pkg/kafkacomm/pdp_topic_consumer_test.go new file mode 100644 index 0000000..2fdfa90 --- /dev/null +++ b/pkg/kafkacomm/pdp_topic_consumer_test.go @@ -0,0 +1,129 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package kafkacomm + +import ( + "errors" + "policy-opa-pdp/pkg/kafkacomm/mocks" + "testing" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestNewKafkaConsumer(t *testing.T) { + // Assuming configuration is correctly loaded from cfg package + // You can mock or override cfg values here if needed + + consumer, err := NewKafkaConsumer() + assert.NoError(t, err, "Expected no error when creating Kafka consumer") + assert.NotNil(t, consumer, "Expected a non-nil KafkaConsumer") + + // Clean up + if consumer != nil { + consumer.Close() + } +} + +func TestReadKafkaMessages_Success(t *testing.T) { + // Create a new mock for ConsumerInterface + mockConsumer := new(mocks.KafkaConsumerInterface) + + // Create a KafkaConsumer with the mock + kc := &KafkaConsumer{Consumer: mockConsumer} + + // Define the expected message + expectedMsg := &kafka.Message{Value: []byte("test message")} + + // Set up the mock to return the expected message + mockConsumer.On("ReadMessage", mock.Anything).Return(expectedMsg, nil) + + // Test ReadKafkaMessages + msg, err := ReadKafkaMessages(kc) + assert.NoError(t, err, "Expected no error when reading message") + assert.Equal(t, expectedMsg.Value, msg, "Expected message content to match") + + // Assert expectations + mockConsumer.AssertExpectations(t) +} + +func TestReadKafkaMessages_Error(t *testing.T) { + mockConsumer := new(mocks.KafkaConsumerInterface) + + kc := &KafkaConsumer{Consumer: mockConsumer} + + // Set up the mock to return an error + expectedErr := errors.New("read error") + mockConsumer.On("ReadMessage", mock.Anything).Return(nil, expectedErr) + + msg, err := ReadKafkaMessages(kc) + assert.Error(t, err, "Expected an error when reading message") + assert.Nil(t, msg, "Expected message to be nil on error") + + mockConsumer.AssertExpectations(t) +} + +func TestKafkaConsumer_Close(t *testing.T) { + mockConsumer := new(mocks.KafkaConsumerInterface) + + kc := &KafkaConsumer{Consumer: mockConsumer} + + // Set up the mock for Close + mockConsumer.On("Close").Return(nil) + + // Test Close method + kc.Close() + + // Verify that Close was called + mockConsumer.AssertExpectations(t) +} + +func TestKafkaConsumer_Unsubscribe(t *testing.T) { + mockConsumer := new(mocks.KafkaConsumerInterface) + + kc := &KafkaConsumer{Consumer: mockConsumer} + + // Set up the mock for Unsubscribe + mockConsumer.On("Unsubscribe").Return(nil) + + // Test Unsubscribe method + err := kc.Unsubscribe() + assert.NoError(t, err) + + // Verify that Unsubscribe was called + mockConsumer.AssertExpectations(t) +} + +func TestKafkaConsumer_Unsubscribe_Error(t *testing.T) { + mockConsumer := new(mocks.KafkaConsumerInterface) + mockError := errors.New("Unsubscribe error") + kc := &KafkaConsumer{Consumer: mockConsumer} + + // Set up the mock for Unsubscribe + mockConsumer.On("Unsubscribe").Return(mockError) + + // Test Unsubscribe method + err := kc.Unsubscribe() + assert.Error(t, err) + assert.Equal(t, mockError, err) + + // Verify that Unsubscribe was called + mockConsumer.AssertExpectations(t) +} diff --git a/pkg/kafkacomm/pdp_topic_producer.go b/pkg/kafkacomm/pdp_topic_producer.go new file mode 100644 index 0000000..1b11b35 --- /dev/null +++ b/pkg/kafkacomm/pdp_topic_producer.go @@ -0,0 +1,107 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== + +// Package kafkacomm provides utilities for producing messages to a Kafka topic +// using a configurable Kafka producer. It supports SASL authentication and +// dynamic topic configuration. +package kafkacomm + +import ( + "github.com/confluentinc/confluent-kafka-go/kafka" + "policy-opa-pdp/cfg" + "sync" + "log" +) + +type KafkaProducerInterface interface { + Produce(*kafka.Message, chan kafka.Event) error + Close() +} + +// KafkaProducer wraps a Kafka producer instance and a topic to provide +// a simple interface for producing messages. +type KafkaProducer struct { + producer KafkaProducerInterface + topic string +} + +var ( + instance *KafkaProducer + once sync.Once +) + +// GetKafkaProducer initializes and returns a KafkaProducer instance which is a singleton. +// It configures the Kafka producer with the given bootstrap servers and topic. +// If SASL authentication is enabled via the configuration, the necessary credentials +// are set in the producer configuration. +func GetKafkaProducer(bootstrapServers, topic string) (*KafkaProducer, error) { + var err error + once.Do(func() { + brokers := cfg.BootstrapServer + useSASL := cfg.UseSASLForKAFKA + username := cfg.KAFKA_USERNAME + password := cfg.KAFKA_PASSWORD + + // Add Kafka Connection Properties .... + configMap := &kafka.ConfigMap{ + "bootstrap.servers": brokers, + } + + if useSASL == "true" { + configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") + configMap.SetKey("sasl.username", username) + configMap.SetKey("sasl.password", password) + configMap.SetKey("security.protocol", "SASL_PLAINTEXT") + } + + p, err := kafka.NewProducer(configMap) + if err != nil { + return + } + instance = &KafkaProducer{ + producer: p, + topic: topic, + } + + }) + return instance, err +} + +// Produce sends a message to the configured Kafka topic. +// It takes the message payload as a byte slice and returns any errors +func (kp *KafkaProducer) Produce(message []byte) error { + kafkaMessage := &kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: kafka.PartitionAny}, + Value: []byte(message), + } + err := kp.producer.Produce(kafkaMessage, nil) + if err != nil { + return err + } + return nil +} + +// Close shuts down the Kafka producer, releasing all resources. +func (kp *KafkaProducer) Close() { + + if kp == nil || kp.producer == nil { + log.Println("KafkaProducer or producer is nil, skipping Close.") + return + } + kp.producer.Close() + log.Println("KafkaProducer closed successfully.") +} diff --git a/pkg/kafkacomm/pdp_topic_producer_test.go b/pkg/kafkacomm/pdp_topic_producer_test.go new file mode 100644 index 0000000..55f3bc8 --- /dev/null +++ b/pkg/kafkacomm/pdp_topic_producer_test.go @@ -0,0 +1,117 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package kafkacomm + +import ( + "errors" + "testing" + "time" + // "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "policy-opa-pdp/pkg/kafkacomm/mocks" // Adjust to your actual mock path +) + +func TestKafkaProducer_Produce_Success(t *testing.T) { + done := make(chan struct{}) + + go func() { + defer close(done) + // Arrange + mockProducer := new(mocks.KafkaProducerInterface) + topic := "test-topic" + kp := &KafkaProducer{ + producer: mockProducer, + topic: topic, + } + + message := []byte("test message") + + // Mock Produce method to simulate successful delivery + mockProducer.On("Produce", mock.Anything, mock.Anything).Return(nil) + + // Act + err := kp.Produce(message) + + assert.NoError(t, err) + mockProducer.AssertExpectations(t) + }() + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("test timed out") + } + +} + +func TestKafkaProducer_Produce_Error(t *testing.T) { + // Arrange + mockProducer := new(mocks.KafkaProducerInterface) + topic := "test-topic" + kp := &KafkaProducer{ + producer: mockProducer, + topic: topic, + } + + // Simulate production error + mockProducer.On("Produce", mock.Anything, mock.Anything).Return(errors.New("produce error")) + + // Act + err := kp.Produce([]byte("test message")) + + // Assert + assert.Error(t, err) + assert.Equal(t, "produce error", err.Error()) + mockProducer.AssertExpectations(t) +} + +func TestKafkaProducer_Close(t *testing.T) { + // Arrange + mockProducer := new(mocks.KafkaProducerInterface) + kp := &KafkaProducer{ + producer: mockProducer, + } + + // Simulate successful close + mockProducer.On("Close").Return() + + // Act + kp.Close() + + // Assert + mockProducer.AssertExpectations(t) +} + +func TestKafkaProducer_Close_Error(t *testing.T) { + // Arrange + mockProducer := new(mocks.KafkaProducerInterface) + kp := &KafkaProducer{ + producer: mockProducer, + } + + // Simulate close error + mockProducer.On("Close").Return() + + // Act + kp.Close() + + // Assert + mockProducer.AssertExpectations(t) +} diff --git a/pkg/kafkacomm/publisher/mocks/PdpStatusSender.go b/pkg/kafkacomm/publisher/mocks/PdpStatusSender.go new file mode 100644 index 0000000..f9cc279 --- /dev/null +++ b/pkg/kafkacomm/publisher/mocks/PdpStatusSender.go @@ -0,0 +1,46 @@ +// Code generated by mockery v2.46.3. DO NOT EDIT. + +package mocks + +import ( + model "policy-opa-pdp/pkg/model" + + mock "github.com/stretchr/testify/mock" +) + +// PdpStatusSender is an autogenerated mock type for the PdpStatusSender type +type PdpStatusSender struct { + mock.Mock +} + +// SendPdpStatus provides a mock function with given fields: pdpStatus +func (_m *PdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error { + ret := _m.Called(pdpStatus) + + if len(ret) == 0 { + panic("no return value specified for SendPdpStatus") + } + + var r0 error + if rf, ok := ret.Get(0).(func(model.PdpStatus) error); ok { + r0 = rf(pdpStatus) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewPdpStatusSender creates a new instance of PdpStatusSender. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewPdpStatusSender(t interface { + mock.TestingT + Cleanup(func()) +}) *PdpStatusSender { + mock := &PdpStatusSender{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat.go b/pkg/kafkacomm/publisher/pdp-heartbeat.go new file mode 100644 index 0000000..f814992 --- /dev/null +++ b/pkg/kafkacomm/publisher/pdp-heartbeat.go @@ -0,0 +1,111 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== + +// The publisher package is responsible for managing periodic heartbeat messages for the +// Open Policy Agent (OPA) Policy Decision Point (PDP) and publishing the PDP's status to relevant channels. +// It provides functions to initialize, manage, and stop timers for sending heartbeat messages, +// ensuring the PDP communicates its health and state periodically. +package publisher + +import ( + "fmt" + "policy-opa-pdp/consts" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/model" + "policy-opa-pdp/pkg/pdpattributes" + "policy-opa-pdp/pkg/pdpstate" + "time" + + "github.com/google/uuid" +) + +var ( + ticker *time.Ticker + stopChan chan bool + currentInterval int64 +) + +// Initializes a timer that sends periodic heartbeat messages to indicate the health and state of the PDP. +func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) { + if intervalMs <= 0 { + log.Errorf("Invalid interval provided: %d. Interval must be greater than zero.", intervalMs) + ticker = nil + return + } + + if ticker != nil && intervalMs == currentInterval { + log.Debug("Ticker is already running") + return + } + + if ticker != nil { + ticker.Stop() + } + // StopTicker() + currentInterval = intervalMs + + ticker = time.NewTicker(time.Duration(intervalMs) * time.Millisecond) + log.Debugf("New Ticker %d", currentInterval) + stopChan = make(chan bool) + go func() { + for { + select { + case <-ticker.C: + sendPDPHeartBeat(s) + case <-stopChan: + ticker.Stop() + return + } + } + }() +} + +// Creates and sends a heartbeat message with the PDP's current state, health, and attributes +func sendPDPHeartBeat(s PdpStatusSender) error { + pdpStatus := model.PdpStatus{ + MessageType: model.PDP_STATUS, + PdpType: consts.PdpType, + State: pdpstate.GetState(), + Healthy: model.Healthy, + Name: pdpattributes.PdpName, + Description: "Pdp heartbeat", + PdpGroup: consts.PdpGroup, + PdpSubgroup: &pdpattributes.PdpSubgroup, + } + pdpStatus.RequestID = uuid.New().String() + pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli()) + + err := s.SendPdpStatus(pdpStatus) + log.Debugf("Sending Heartbeat ...") + if err != nil { + log.Warnf("Error producing message: %v\n", err) + return err + } else { + return nil + } +} + +// Stops the running ticker and terminates the goroutine managing heartbeat messages. +func StopTicker() { + if ticker != nil && stopChan != nil { + stopChan <- true + close(stopChan) + ticker = nil + } else { + log.Debugf("Ticker is not Running") + } +} diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go new file mode 100644 index 0000000..f03b0eb --- /dev/null +++ b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go @@ -0,0 +1,135 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package publisher + +import ( + /* "fmt" + "policy-opa-pdp/cfg" + "policy-opa-pdp/consts" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/model" + "policy-opa-pdp/pkg/pdpstate"*/ + "errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "policy-opa-pdp/pkg/kafkacomm/publisher/mocks" + "testing" + // "time" + /* "github.com/google/uuid"*/) + +var ( +// ticker *time.Ticker +// stopChan chan bool +// currentInterval int64 +) + +/* +Success Case 1 +TestStartHeartbeatIntervalTimer_ValidInterval +Description: Test starting the heartbeat interval timer with a valid interval. +Input: intervalMs = 1000 +Expected Output: The ticker starts with an interval of 1000 milliseconds, and heartbeat messages are sent at this interval. +*/ +func TestStartHeartbeatIntervalTimer_ValidInterval(t *testing.T) { + + intervalMs := int64(1000) + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(nil) + + StartHeartbeatIntervalTimer(intervalMs, mockSender) + if ticker == nil { + t.Errorf("Expected ticker to be initialized") + } + if currentInterval != intervalMs { + t.Errorf("Expected currentInterval to be %d, got %d", intervalMs, currentInterval) + } +} + +/* +Failure Case 1 +TestStartHeartbeatIntervalTimer_InvalidInterval +Description: Test starting the heartbeat interval timer with an invalid interval. +Input: intervalMs = -1000 +Expected Output: The function should handle the invalid interval gracefully, possibly by logging an error message and not starting the ticker. +*/ +func TestStartHeartbeatIntervalTimer_InvalidInterval(t *testing.T) { + intervalMs := int64(-1000) + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(nil) + + StartHeartbeatIntervalTimer(intervalMs, mockSender) + + if ticker != nil { + t.Log("Expected ticker to be nil for invalid interval") + } +} + +/* +TestSendPDPHeartBeat_Success 2 +Description: Test sending a heartbeat successfully. +Input: Valid pdpStatus object +Expected Output: Heartbeat message is sent successfully, and a debug log "Message sent successfully" is generated. +*/ +func TestSendPDPHeartBeat_Success(t *testing.T) { + + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(nil) + err := sendPDPHeartBeat(mockSender) + assert.NoError(t, err) +} + +/* +TestSendPDPHeartBeat_Failure 2 +Description: Test failing to send a heartbeat. +Input: Invalid pdpStatus object or network failure +Expected Output: An error occurs while sending the heartbeat, and a warning log "Error producing message: ..." is generated. +*/ +func TestSendPDPHeartBeat_Failure(t *testing.T) { + // Mock SendPdpStatus to return an error + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("Error producing message")) + err := sendPDPHeartBeat(mockSender) + assert.Error(t, err) +} + +/* +TestStopTicker_Success 3 +Description: Test stopping the ticker. +Input: Ticker is running +Expected Output: The ticker stops, and the stop channel is closed. +*/ +func TestStopTicker_Success(t *testing.T) { + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(nil) + StartHeartbeatIntervalTimer(1000, mockSender) + StopTicker() + if ticker != nil { + t.Errorf("Expected ticker to be nil") + } +} + +/* +TestStopTicker_NotRunning 3 +Description: Test stopping the ticker when it is not running. +Input: Ticker is not running +Expected Output: The function should handle this case gracefully, possibly by logging a debug message indicating that the ticker is not running. +*/ +func TestStopTicker_NotRunning(t *testing.T) { + StopTicker() +} diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration.go b/pkg/kafkacomm/publisher/pdp-pap-registration.go new file mode 100644 index 0000000..75f22d6 --- /dev/null +++ b/pkg/kafkacomm/publisher/pdp-pap-registration.go @@ -0,0 +1,95 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== + +// allows to send the pdp registartion message with unique transaction id and timestamp to topic +package publisher + +import ( + "encoding/json" + "fmt" + "github.com/google/uuid" + "policy-opa-pdp/cfg" + "policy-opa-pdp/consts" + "policy-opa-pdp/pkg/kafkacomm" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/model" + "policy-opa-pdp/pkg/pdpattributes" + "time" +) + +type PdpStatusSender interface { + SendPdpStatus(pdpStatus model.PdpStatus) error +} + +type RealPdpStatusSender struct{} + +// Sends PdpSTatus Message type to KafkaTopic +func (s *RealPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error { + + var topic string + bootstrapServers := cfg.BootstrapServer + topic = cfg.Topic + pdpStatus.RequestID = uuid.New().String() + pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli()) + + jsonMessage, err := json.Marshal(pdpStatus) + if err != nil { + log.Warnf("failed to marshal PdpStatus to JSON: %v", err) + return err + } + + producer, err := kafkacomm.GetKafkaProducer(bootstrapServers, topic) + if err != nil { + log.Warnf("Error creating Kafka producer: %v\n", err) + return err + } + + err = producer.Produce(jsonMessage) + if err != nil { + log.Warnf("Error producing message: %v\n", err) + } else { + log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage)) + } + + return nil +} + +// sends the registartion message to topic using SendPdpStatus(pdpStatus) +func SendPdpPapRegistration(s PdpStatusSender) error { + + var pdpStatus = model.PdpStatus{ + MessageType: model.PDP_STATUS, + PdpType: consts.PdpType, + State: model.Passive, + Healthy: model.Healthy, + Policies: nil, + PdpResponse: nil, + Name: pdpattributes.PdpName, + Description: "Pdp Status Registration Message", + PdpGroup: consts.PdpGroup, + } + + log.Debugf("Sending PDP PAP Registration Message") + + err := s.SendPdpStatus(pdpStatus) + if err != nil { + log.Warnf("Error producing message: %v\n", err) + return err + } + return nil + +} diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go new file mode 100644 index 0000000..03749de --- /dev/null +++ b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go @@ -0,0 +1,58 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package publisher + +import ( + "errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "policy-opa-pdp/pkg/kafkacomm/publisher/mocks" + "policy-opa-pdp/pkg/model" + "testing" +) + +type MockPdpStatusSender struct { + mock.Mock +} + +func (m *MockPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error { + return m.Called(pdpStatus).Error(0) + +} + +func TestSendPdpPapRegistration_Success(t *testing.T) { + mockSender := new(mocks.PdpStatusSender) + + mockSender.On("SendPdpStatus", mock.AnythingOfType("model.PdpStatus")).Return(nil) + + err := SendPdpPapRegistration(mockSender) + assert.NoError(t, err) + mockSender.AssertCalled(t, "SendPdpStatus", mock.AnythingOfType("model.PdpStatus")) +} + +func TestSendPdpPapRegistration_Failure(t *testing.T) { + mockSender := new(mocks.PdpStatusSender) + + mockSender.On("SendPdpStatus", mock.AnythingOfType("model.PdpStatus")).Return(errors.New("failed To Send")) + + err := SendPdpPapRegistration(mockSender) + assert.Error(t, err, "Expected an error for failure") + assert.EqualError(t, err, "failed To Send", "Error messages should match") + mockSender.AssertCalled(t, "SendPdpStatus", mock.AnythingOfType("model.PdpStatus")) +} diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher.go b/pkg/kafkacomm/publisher/pdp-status-publisher.go new file mode 100644 index 0000000..756d0f2 --- /dev/null +++ b/pkg/kafkacomm/publisher/pdp-status-publisher.go @@ -0,0 +1,109 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +// responsible for sending PDP_STATUS messages in response to specific events +// such as updates (PDP_UPDATE) or state changes (PDP_STATE_CHANGE). These responses provide details +// about the current state, health, and attributes of the Policy Decision Point (PDP). +package publisher + +import ( + "fmt" + "policy-opa-pdp/consts" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/model" + "policy-opa-pdp/pkg/pdpattributes" + "policy-opa-pdp/pkg/pdpstate" + "time" + + "github.com/google/uuid" +) + +// Sends a PDP_STATUS message to indicate the successful processing of a PDP_UPDATE request +// received from the Policy Administration Point (PAP). +func SendPdpUpdateResponse(s PdpStatusSender, pdpUpdate *model.PdpUpdate) error { + + responseStatus := model.Success + responseMessage := "PDP Update was Successful" + + pdpStatus := model.PdpStatus{ + MessageType: model.PDP_STATUS, + PdpType: consts.PdpType, + State: pdpstate.State, + Healthy: model.Healthy, + Name: pdpattributes.PdpName, + Description: "Pdp Status Response Message For Pdp Update", + PdpGroup: consts.PdpGroup, + PdpSubgroup: &pdpattributes.PdpSubgroup, + // Policies: [], + PdpResponse: &model.PdpResponseDetails{ + ResponseTo: &pdpUpdate.RequestId, + ResponseStatus: &responseStatus, + ResponseMessage: &responseMessage, + }, + } + + pdpStatus.RequestID = uuid.New().String() + pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli()) + + log.Infof("Sending PDP Status With Update Response") + + err := s.SendPdpStatus(pdpStatus) + if err != nil { + log.Warnf("Failed to send PDP Update Message : %v", err) + return err + } + + return nil + +} + +// Sends a PDP_STATUS message to indicate a state change in the PDP (e.g., from PASSIVE to ACTIVE). +func SendStateChangeResponse(s PdpStatusSender, pdpStateChange *model.PdpStateChange) error { + + responseStatus := model.Success + responseMessage := "PDP State Changed From PASSIVE TO Active" + pdpStatus := model.PdpStatus{ + MessageType: model.PDP_STATUS, + PdpType: consts.PdpType, + State: pdpstate.GetState(), + Healthy: model.Healthy, + Name: pdpattributes.PdpName, + Description: "Pdp Status Response Message to Pdp State Change", + PdpGroup: consts.PdpGroup, + PdpSubgroup: &pdpattributes.PdpSubgroup, + // Policies: [], + PdpResponse: &model.PdpResponseDetails{ + ResponseTo: &pdpStateChange.RequestId, + ResponseStatus: &responseStatus, + ResponseMessage: &responseMessage, + }, + } + + pdpStatus.RequestID = uuid.New().String() + pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli()) + + log.Infof("Sending PDP Status With State Change response") + + err := s.SendPdpStatus(pdpStatus) + if err != nil { + log.Warnf("Failed to send PDP Update Message : %v", err) + return err + } + + return nil +} diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go new file mode 100644 index 0000000..5e02704 --- /dev/null +++ b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go @@ -0,0 +1,83 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package publisher + +import ( + "errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "policy-opa-pdp/pkg/kafkacomm/publisher/mocks" + "policy-opa-pdp/pkg/model" + "testing" +) + +// TestSendPdpUpdateResponse_Success tests SendPdpUpdateResponse for a successful response +func TestSendPdpUpdateResponse_Success(t *testing.T) { + + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(nil) + pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"} + + err := SendPdpUpdateResponse(mockSender, pdpUpdate) + assert.NoError(t, err) + mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything) +} + +// TestSendPdpUpdateResponse_Failure tests SendPdpUpdateResponse when SendPdpStatus fails +func TestSendPdpUpdateResponse_Failure(t *testing.T) { + + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("mock send error")) + + pdpUpdate := &model.PdpUpdate{RequestId: "test-request-id"} + + err := SendPdpUpdateResponse(mockSender, pdpUpdate) + + assert.Error(t, err) + + mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything) +} + +// TestSendStateChangeResponse_Success tests SendStateChangeResponse for a successful state change response +func TestSendStateChangeResponse_Success(t *testing.T) { + + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(nil) + + pdpStateChange := &model.PdpStateChange{RequestId: "test-state-change-id"} + + err := SendStateChangeResponse(mockSender, pdpStateChange) + + assert.NoError(t, err) + mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything) +} + +// TestSendStateChangeResponse_Failure tests SendStateChangeResponse when SendPdpStatus fails +func TestSendStateChangeResponse_Failure(t *testing.T) { + + mockSender := new(mocks.PdpStatusSender) + mockSender.On("SendPdpStatus", mock.Anything).Return(errors.New("mock send error")) + + pdpStateChange := &model.PdpStateChange{RequestId: "test-state-change-id"} + + err := SendStateChangeResponse(mockSender, pdpStateChange) + assert.Error(t, err) + mockSender.AssertCalled(t, "SendPdpStatus", mock.Anything) + +} |