aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/kafkacomm
diff options
context:
space:
mode:
authorShalini Shivam <ss00765416@techmahindra.com>2024-12-13 15:56:10 +0100
committerShalini Shivam <ss00765416@techmahindra.com>2024-12-16 11:09:20 +0100
commit83f8646f702f9cffbd25d8124476465ee8f94af0 (patch)
tree7ff6b73eaf973a16bdbc9f6b5fa33d2a2d9727f5 /pkg/kafkacomm
parentd2d039fc4525943ea0d16e49e77c830a8e5c0ecc (diff)
Support Output response to OPA query
Description : For details refer https://lf-onap.atlassian.net/wiki/spaces/DW/pages/51150925/OPA+PDP Issue-ID: POLICY-5204 Change-Id: Id6d51fa83957fb560afec2d85cc0d45d6dda6900 Signed-off-by: Shalini Shivam <ss00765416@techmahindra.com>
Diffstat (limited to 'pkg/kafkacomm')
-rw-r--r--pkg/kafkacomm/handler/pdp_message_handler.go99
-rw-r--r--pkg/kafkacomm/handler/pdp_message_handler_test.go61
-rw-r--r--pkg/kafkacomm/handler/pdp_state_change_handler.go3
-rw-r--r--pkg/kafkacomm/handler/pdp_state_change_handler_test.go3
-rw-r--r--pkg/kafkacomm/handler/pdp_update_message_handler.go3
-rw-r--r--pkg/kafkacomm/handler/pdp_update_message_handler_test.go10
-rw-r--r--pkg/kafkacomm/pdp_topic_consumer.go120
-rw-r--r--pkg/kafkacomm/pdp_topic_consumer_test.go3
-rw-r--r--pkg/kafkacomm/pdp_topic_producer.go5
-rw-r--r--pkg/kafkacomm/pdp_topic_producer_test.go3
-rw-r--r--pkg/kafkacomm/publisher/pdp-heartbeat.go5
-rw-r--r--pkg/kafkacomm/publisher/pdp-heartbeat_test.go3
-rw-r--r--pkg/kafkacomm/publisher/pdp-pap-registration.go3
-rw-r--r--pkg/kafkacomm/publisher/pdp-pap-registration_test.go3
-rw-r--r--pkg/kafkacomm/publisher/pdp-status-publisher.go3
-rw-r--r--pkg/kafkacomm/publisher/pdp-status-publisher_test.go3
16 files changed, 229 insertions, 101 deletions
diff --git a/pkg/kafkacomm/handler/pdp_message_handler.go b/pkg/kafkacomm/handler/pdp_message_handler.go
index 8d7da92..8d1b9b4 100644
--- a/pkg/kafkacomm/handler/pdp_message_handler.go
+++ b/pkg/kafkacomm/handler/pdp_message_handler.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
// The handler package is responsible for processing messages from Kafka, specifically targeting the OPA
@@ -22,14 +23,35 @@
package handler
import (
+ "context"
"encoding/json"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/kafkacomm"
"policy-opa-pdp/pkg/kafkacomm/publisher"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/pdpattributes"
+ "sync"
)
+var (
+ shutdownFlag bool
+ mu sync.Mutex
+)
+
+// SetShutdownFlag sets the shutdown flag
+func SetShutdownFlag() {
+ mu.Lock()
+ shutdownFlag = true
+ mu.Unlock()
+}
+
+// IsShutdown checks if the consumer has already been shut down
+func IsShutdown() bool {
+ mu.Lock()
+ defer mu.Unlock()
+ return shutdownFlag
+}
+
type OpaPdpMessage struct {
Name string `json:"name"` // Name of the PDP (optional for broadcast messages).
MessageType string `json:"MessageName"` // Type of the message (e.g., PDP_UPDATE, PDP_STATE_CHANGE, etc.)
@@ -74,58 +96,65 @@ func checkIfMessageIsForOpaPdp(message OpaPdpMessage) bool {
// Handles incoming Kafka messages, validates their relevance to the current PDP,
// and dispatches them for further processing based on their type.
-func PdpMessageHandler(kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error {
+func PdpMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error {
log.Debug("Starting PDP Message Listener.....")
var stopConsuming bool
for !stopConsuming {
- message, err := kafkacomm.ReadKafkaMessages(kc)
- if err != nil {
- log.Warnf("Failed to Read Kafka Messages: %v\n", err)
- continue
- }
- log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message))
-
- if message != nil {
-
- var opaPdpMessage OpaPdpMessage
-
- err = json.Unmarshal(message, &opaPdpMessage)
+ select {
+ case <-ctx.Done():
+ log.Debug("Stopping PDP Listener.....")
+ return nil
+ stopConsuming = true ///Loop Exits
+ default:
+ message, err := kafkacomm.ReadKafkaMessages(kc)
if err != nil {
- log.Warnf("Failed to UnMarshal Messages: %v\n", err)
continue
}
+ log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message))
- if !checkIfMessageIsForOpaPdp(opaPdpMessage) {
-
- log.Warnf("Not a valid Opa Pdp Message")
- continue
- }
+ if message != nil {
- switch opaPdpMessage.MessageType {
+ var opaPdpMessage OpaPdpMessage
- case "PDP_UPDATE":
- err = PdpUpdateMessageHandler(message, p)
+ err = json.Unmarshal(message, &opaPdpMessage)
if err != nil {
- log.Warnf("Error processing Update Message: %v", err)
+ log.Warnf("Failed to UnMarshal Messages: %v\n", err)
+ continue
}
- case "PDP_STATE_CHANGE":
- err = PdpStateChangeMessageHandler(message, p)
- if err != nil {
- log.Warnf("Error processing Update Message: %v", err)
+ if !checkIfMessageIsForOpaPdp(opaPdpMessage) {
+
+ log.Warnf("Not a valid Opa Pdp Message")
+ continue
}
- case "PDP_STATUS":
- log.Debugf("discarding event of type PDP_STATUS")
- continue
- default:
- log.Errorf("This is not a valid Message Type: %s", opaPdpMessage.MessageType)
- continue
+ switch opaPdpMessage.MessageType {
- }
+ case "PDP_UPDATE":
+ err = PdpUpdateMessageHandler(message, p)
+ if err != nil {
+ log.Warnf("Error processing Update Message: %v", err)
+ }
+ case "PDP_STATE_CHANGE":
+ err = PdpStateChangeMessageHandler(message, p)
+ if err != nil {
+ log.Warnf("Error processing Update Message: %v", err)
+ }
+
+ case "PDP_STATUS":
+ log.Debugf("discarding event of type PDP_STATUS")
+ continue
+ default:
+ log.Errorf("This is not a valid Message Type: %s", opaPdpMessage.MessageType)
+ continue
+
+ }
+
+ }
}
+
}
return nil
diff --git a/pkg/kafkacomm/handler/pdp_message_handler_test.go b/pkg/kafkacomm/handler/pdp_message_handler_test.go
index 3764c9e..8ba1e0e 100644
--- a/pkg/kafkacomm/handler/pdp_message_handler_test.go
+++ b/pkg/kafkacomm/handler/pdp_message_handler_test.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
//
@@ -20,8 +21,13 @@ package handler
import (
"github.com/stretchr/testify/assert"
+ "policy-opa-pdp/consts"
"policy-opa-pdp/pkg/pdpattributes"
"testing"
+ // "context"
+ // "encoding/json"
+ // "errors"
+ // "policy-opa-pdp/pkg/kafkacomm/mocks"
)
/*
@@ -36,7 +42,7 @@ func TestCheckIfMessageIsForOpaPdp_Check(t *testing.T) {
opapdpMessage.Name = "opa-3a318049-813f-4172-b4d3-7d4f466e5b80"
opapdpMessage.MessageType = "PDP_STATUS"
- opapdpMessage.PdpGroup = "defaultGroup"
+ opapdpMessage.PdpGroup = "opaGroup"
opapdpMessage.PdpSubgroup = "opa"
assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Its a valid Opa Pdp Message")
@@ -55,7 +61,7 @@ func TestCheckIfMessageIsForOpaPdp_Check_Message_Name(t *testing.T) {
opapdpMessage.Name = ""
opapdpMessage.MessageType = "PDP_STATUS"
- opapdpMessage.PdpGroup = "defaultGroup"
+ opapdpMessage.PdpGroup = "opaGroup"
opapdpMessage.PdpSubgroup = "opa"
assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Not a valid Opa Pdp Message")
@@ -74,7 +80,7 @@ func TestCheckIfMessageIsForOpaPdp_Check_PdpGroup(t *testing.T) {
opapdpMessage.Name = ""
opapdpMessage.MessageType = "PDP_STATUS"
- opapdpMessage.PdpGroup = "defaultGroup"
+ opapdpMessage.PdpGroup = "opaGroup"
opapdpMessage.PdpSubgroup = "opa"
pdpattributes.PdpSubgroup = "opa"
@@ -113,7 +119,7 @@ func TestCheckIfMessageIsForOpaPdp_Check_PdpSubgroup(t *testing.T) {
opapdpMessage.Name = ""
opapdpMessage.MessageType = "PDP_STATUS"
- opapdpMessage.PdpGroup = "defaultGroup"
+ opapdpMessage.PdpGroup = "opaGroup"
opapdpMessage.PdpSubgroup = "opa"
pdpattributes.PdpSubgroup = "opa"
@@ -133,10 +139,53 @@ func TestCheckIfMessageIsForOpaPdp_Check_IncorrectPdpSubgroup(t *testing.T) {
opapdpMessage.Name = ""
opapdpMessage.MessageType = "PDP_STATUS"
- opapdpMessage.PdpGroup = "defaultGroup"
+ opapdpMessage.PdpGroup = "opaGroup"
opapdpMessage.PdpSubgroup = "o"
pdpattributes.PdpSubgroup = "opa"
assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Not a valid Opa Pdp Message")
}
+
+func TestCheckIfMessageIsForOpaPdp_EmptyPdpSubgroupAndGroup(t *testing.T) {
+ var opapdpMessage OpaPdpMessage
+ opapdpMessage.Name = ""
+ opapdpMessage.MessageType = "PDP_STATUS"
+ opapdpMessage.PdpGroup = ""
+ opapdpMessage.PdpSubgroup = ""
+
+ assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Message should be invalid when PdpGroup and PdpSubgroup are empty")
+}
+
+func TestCheckIfMessageIsForOpaPdp_ValidBroadcastMessage(t *testing.T) {
+ var opapdpMessage OpaPdpMessage
+ opapdpMessage.Name = ""
+ opapdpMessage.MessageType = "PDP_UPDATE"
+ opapdpMessage.PdpGroup = "opaGroup"
+ opapdpMessage.PdpSubgroup = ""
+
+ pdpattributes.PdpSubgroup = "opa"
+ consts.PdpGroup = "opaGroup"
+
+ assert.True(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Valid broadcast message should pass the check")
+}
+
+func TestCheckIfMessageIsForOpaPdp_InvalidGroupMismatch(t *testing.T) {
+ var opapdpMessage OpaPdpMessage
+ opapdpMessage.Name = ""
+ opapdpMessage.MessageType = "PDP_STATUS"
+ opapdpMessage.PdpGroup = "wrongGroup"
+ opapdpMessage.PdpSubgroup = ""
+
+ consts.PdpGroup = "opaGroup"
+
+ assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Message with mismatched PdpGroup should fail")
+}
+
+// Test SetShutdownFlag and IsShutdown
+func TestSetAndCheckShutdownFlag(t *testing.T) {
+ assert.False(t, IsShutdown(), "Shutdown flag should be false initially")
+
+ SetShutdownFlag()
+ assert.True(t, IsShutdown(), "Shutdown flag should be true after calling SetShutdownFlag")
+}
diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler.go b/pkg/kafkacomm/handler/pdp_state_change_handler.go
index 32d998f..2de89ff 100644
--- a/pkg/kafkacomm/handler/pdp_state_change_handler.go
+++ b/pkg/kafkacomm/handler/pdp_state_change_handler.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
// will process the state change message from pap and send the pdp status response.
diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go
index f7e8f84..67edd6f 100644
--- a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go
+++ b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
//
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler.go b/pkg/kafkacomm/handler/pdp_update_message_handler.go
index 632bcc8..efe115c 100644
--- a/pkg/kafkacomm/handler/pdp_update_message_handler.go
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
// will process the update message from pap and send the pdp status response.
diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
index 061f1ce..4d5d7dc 100644
--- a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
+++ b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go
@@ -1,6 +1,5 @@
-// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
//
@@ -43,7 +43,7 @@ func TestPdpUpdateMessageHandler_Success(t *testing.T) {
"requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
"timestampMs":1730722305297,
"name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059",
- "pdpGroup":"defaultGroup",
+ "pdpGroup":"opaGroup",
"pdpSubgroup":"opa"
}`
@@ -154,7 +154,7 @@ func TestPdpUpdateMessageHandler_Fails_Sending_UpdateResponse(t *testing.T) {
"requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
"timestampMs":1730722305297,
"name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059",
- "pdpGroup":"defaultGroup"
+ "pdpGroup":"opaGroup"
}`
mockSender := new(mocks.PdpStatusSender)
@@ -183,7 +183,7 @@ func TestPdpUpdateMessageHandler_Invalid_Starttimeinterval(t *testing.T) {
"requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
"timestampMs":1730722305297,
"name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059",
- "pdpGroup":"defaultGroup",
+ "pdpGroup":"opaGroup",
"pdpSubgroup":"opa"
}`
diff --git a/pkg/kafkacomm/pdp_topic_consumer.go b/pkg/kafkacomm/pdp_topic_consumer.go
index 4858bdf..3d19e6c 100644
--- a/pkg/kafkacomm/pdp_topic_consumer.go
+++ b/pkg/kafkacomm/pdp_topic_consumer.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
// kafkacomm package provides a structured way to create and manage Kafka consumers,
@@ -20,12 +21,20 @@
package kafkacomm
import (
+ "fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"policy-opa-pdp/cfg"
"policy-opa-pdp/pkg/log"
+ "sync"
"time"
)
+var (
+ // Declare a global variable to hold the singleton KafkaConsumer
+ consumerInstance *KafkaConsumer
+ consumerOnce sync.Once // sync.Once ensures that the consumer is created only once
+)
+
// KafkaConsumerInterface defines the interface for a Kafka consumer.
type KafkaConsumerInterface interface {
Close() error
@@ -40,63 +49,92 @@ type KafkaConsumer struct {
// Close closes the KafkaConsumer
func (kc *KafkaConsumer) Close() {
- kc.Consumer.Close()
+ if kc.Consumer != nil {
+ kc.Consumer.Close()
+ }
}
// Unsubscribe unsubscribes the KafkaConsumer
func (kc *KafkaConsumer) Unsubscribe() error {
- if err := kc.Consumer.Unsubscribe(); err != nil {
- log.Warnf("Error Unsubscribing :%v", err)
+ if kc.Consumer == nil {
+ return fmt.Errorf("Kafka Consumer is nil so cannot Unsubscribe")
+ }
+ err := kc.Consumer.Unsubscribe()
+ if err != nil {
+ log.Warnf("Error Unsubscribing: %v", err)
return err
}
- log.Debug("Unsubscribe From Topic")
+ log.Debug("Unsubscribed From Topic")
return nil
}
-// creates a new Kafka consumer and returns it
+// NewKafkaConsumer creates a new Kafka consumer and returns it
func NewKafkaConsumer() (*KafkaConsumer, error) {
- brokers := cfg.BootstrapServer
- groupid := cfg.GroupId
- topic := cfg.Topic
- useSASL := cfg.UseSASLForKAFKA
- username := cfg.KAFKA_USERNAME
- password := cfg.KAFKA_PASSWORD
+ // Initialize the consumer instance only once
+ consumerOnce.Do(func() {
+ log.Debugf("Creating Kafka Consumer singleton instance")
+ brokers := cfg.BootstrapServer
+ groupid := cfg.GroupId
+ topic := cfg.Topic
+ useSASL := cfg.UseSASLForKAFKA
+ username := cfg.KAFKA_USERNAME
+ password := cfg.KAFKA_PASSWORD
- // Add Kafka Connection Properties ....
- configMap := &kafka.ConfigMap{
- "bootstrap.servers": brokers,
- "group.id": groupid,
- "auto.offset.reset": "earliest",
- }
- //for STRIMZI-KAFKA in case sasl is enabled
- if useSASL == "true" {
- configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512")
- configMap.SetKey("sasl.username", username)
- configMap.SetKey("sasl.password", password)
- configMap.SetKey("security.protocol", "SASL_PLAINTEXT")
- }
+ // Add Kafka connection properties
+ configMap := &kafka.ConfigMap{
+ "bootstrap.servers": brokers,
+ "group.id": groupid,
+ "auto.offset.reset": "latest",
+ }
- // create new Kafka Consumer
- consumer, err := kafka.NewConsumer(configMap)
- if err != nil {
- log.Warnf("Error creating consumer: %v\n", err)
- return nil, err
- }
- //subscribe to topic
- err = consumer.SubscribeTopics([]string{topic}, nil)
- if err != nil {
- log.Warnf("Error subcribing to topic: %v\n", err)
- return nil, err
+ // If SASL is enabled, add SASL properties
+ if useSASL == "true" {
+ configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512")
+ configMap.SetKey("sasl.username", username)
+ configMap.SetKey("sasl.password", password)
+ configMap.SetKey("security.protocol", "SASL_PLAINTEXT")
+ configMap.SetKey("session.timeout.ms", "30000")
+ configMap.SetKey("max.poll.interval.ms", "300000")
+ configMap.SetKey("enable.partition.eof", true)
+ configMap.SetKey("enable.auto.commit", true)
+ // configMap.SetKey("debug", "all") // Uncomment for debug
+ }
+
+ // Create a new Kafka consumer
+ consumer, err := kafka.NewConsumer(configMap)
+ if err != nil {
+ log.Warnf("Error creating consumer: %v", err)
+ return
+ }
+ if consumer == nil {
+ log.Warnf("Kafka Consumer is nil after creation")
+ return
+ }
+
+ // Subscribe to the topic
+ err = consumer.SubscribeTopics([]string{topic}, nil)
+ if err != nil {
+ log.Warnf("Error subscribing to topic: %v", err)
+ return
+ }
+ log.Debugf("Topic Subscribed: %v", topic)
+
+ // Assign the consumer instance
+ consumerInstance = &KafkaConsumer{Consumer: consumer}
+ log.Debugf("Created SIngleton consumer instance")
+ })
+
+ // Return the singleton consumer instance
+ if consumerInstance == nil {
+ return nil, fmt.Errorf("Kafka Consumer instance not created")
}
- log.Debugf("Topic Subscribed... : %v", topic)
- return &KafkaConsumer{Consumer: consumer}, nil
+ return consumerInstance, nil
}
-// gets the Kafka messages on the subscribed topic
+// ReadKafkaMessages gets the Kafka messages on the subscribed topic
func ReadKafkaMessages(kc *KafkaConsumer) ([]byte, error) {
- msg, err := kc.Consumer.ReadMessage(-1)
+ msg, err := kc.Consumer.ReadMessage(100 * time.Millisecond)
if err != nil {
- log.Warnf("Error reading Kafka message: %v", err)
return nil, err
}
return msg.Value, nil
diff --git a/pkg/kafkacomm/pdp_topic_consumer_test.go b/pkg/kafkacomm/pdp_topic_consumer_test.go
index 2fdfa90..9feeeaa 100644
--- a/pkg/kafkacomm/pdp_topic_consumer_test.go
+++ b/pkg/kafkacomm/pdp_topic_consumer_test.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
//
diff --git a/pkg/kafkacomm/pdp_topic_producer.go b/pkg/kafkacomm/pdp_topic_producer.go
index 1b11b35..d8edb0b 100644
--- a/pkg/kafkacomm/pdp_topic_producer.go
+++ b/pkg/kafkacomm/pdp_topic_producer.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
// Package kafkacomm provides utilities for producing messages to a Kafka topic
@@ -22,9 +23,9 @@ package kafkacomm
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
+ "log"
"policy-opa-pdp/cfg"
"sync"
- "log"
)
type KafkaProducerInterface interface {
diff --git a/pkg/kafkacomm/pdp_topic_producer_test.go b/pkg/kafkacomm/pdp_topic_producer_test.go
index 55f3bc8..3379845 100644
--- a/pkg/kafkacomm/pdp_topic_producer_test.go
+++ b/pkg/kafkacomm/pdp_topic_producer_test.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
//
diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat.go b/pkg/kafkacomm/publisher/pdp-heartbeat.go
index f814992..fbd07d6 100644
--- a/pkg/kafkacomm/publisher/pdp-heartbeat.go
+++ b/pkg/kafkacomm/publisher/pdp-heartbeat.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
// The publisher package is responsible for managing periodic heartbeat messages for the
@@ -55,7 +56,7 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) {
if ticker != nil {
ticker.Stop()
}
- // StopTicker()
+ // StopTicker()
currentInterval = intervalMs
ticker = time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
index f03b0eb..e95866e 100644
--- a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
+++ b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
//
diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration.go b/pkg/kafkacomm/publisher/pdp-pap-registration.go
index 75f22d6..54b12ea 100644
--- a/pkg/kafkacomm/publisher/pdp-pap-registration.go
+++ b/pkg/kafkacomm/publisher/pdp-pap-registration.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
// allows to send the pdp registartion message with unique transaction id and timestamp to topic
diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
index 03749de..725b4b9 100644
--- a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
+++ b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
//
diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher.go b/pkg/kafkacomm/publisher/pdp-status-publisher.go
index 756d0f2..4a13b1c 100644
--- a/pkg/kafkacomm/publisher/pdp-status-publisher.go
+++ b/pkg/kafkacomm/publisher/pdp-status-publisher.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
//
diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
index 5e02704..83154ca 100644
--- a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
+++ b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go
@@ -1,6 +1,6 @@
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telecom
+// Copyright (C) 2024: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
// ========================LICENSE_END===================================
//