diff options
author | Shalini Shivam <ss00765416@techmahindra.com> | 2024-12-13 15:56:10 +0100 |
---|---|---|
committer | Shalini Shivam <ss00765416@techmahindra.com> | 2024-12-16 11:09:20 +0100 |
commit | 83f8646f702f9cffbd25d8124476465ee8f94af0 (patch) | |
tree | 7ff6b73eaf973a16bdbc9f6b5fa33d2a2d9727f5 /pkg/kafkacomm | |
parent | d2d039fc4525943ea0d16e49e77c830a8e5c0ecc (diff) |
Support Output response to OPA query
Description : For details refer https://lf-onap.atlassian.net/wiki/spaces/DW/pages/51150925/OPA+PDP
Issue-ID: POLICY-5204
Change-Id: Id6d51fa83957fb560afec2d85cc0d45d6dda6900
Signed-off-by: Shalini Shivam <ss00765416@techmahindra.com>
Diffstat (limited to 'pkg/kafkacomm')
-rw-r--r-- | pkg/kafkacomm/handler/pdp_message_handler.go | 99 | ||||
-rw-r--r-- | pkg/kafkacomm/handler/pdp_message_handler_test.go | 61 | ||||
-rw-r--r-- | pkg/kafkacomm/handler/pdp_state_change_handler.go | 3 | ||||
-rw-r--r-- | pkg/kafkacomm/handler/pdp_state_change_handler_test.go | 3 | ||||
-rw-r--r-- | pkg/kafkacomm/handler/pdp_update_message_handler.go | 3 | ||||
-rw-r--r-- | pkg/kafkacomm/handler/pdp_update_message_handler_test.go | 10 | ||||
-rw-r--r-- | pkg/kafkacomm/pdp_topic_consumer.go | 120 | ||||
-rw-r--r-- | pkg/kafkacomm/pdp_topic_consumer_test.go | 3 | ||||
-rw-r--r-- | pkg/kafkacomm/pdp_topic_producer.go | 5 | ||||
-rw-r--r-- | pkg/kafkacomm/pdp_topic_producer_test.go | 3 | ||||
-rw-r--r-- | pkg/kafkacomm/publisher/pdp-heartbeat.go | 5 | ||||
-rw-r--r-- | pkg/kafkacomm/publisher/pdp-heartbeat_test.go | 3 | ||||
-rw-r--r-- | pkg/kafkacomm/publisher/pdp-pap-registration.go | 3 | ||||
-rw-r--r-- | pkg/kafkacomm/publisher/pdp-pap-registration_test.go | 3 | ||||
-rw-r--r-- | pkg/kafkacomm/publisher/pdp-status-publisher.go | 3 | ||||
-rw-r--r-- | pkg/kafkacomm/publisher/pdp-status-publisher_test.go | 3 |
16 files changed, 229 insertions, 101 deletions
diff --git a/pkg/kafkacomm/handler/pdp_message_handler.go b/pkg/kafkacomm/handler/pdp_message_handler.go index 8d7da92..8d1b9b4 100644 --- a/pkg/kafkacomm/handler/pdp_message_handler.go +++ b/pkg/kafkacomm/handler/pdp_message_handler.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // The handler package is responsible for processing messages from Kafka, specifically targeting the OPA @@ -22,14 +23,35 @@ package handler import ( + "context" "encoding/json" "policy-opa-pdp/consts" "policy-opa-pdp/pkg/kafkacomm" "policy-opa-pdp/pkg/kafkacomm/publisher" "policy-opa-pdp/pkg/log" "policy-opa-pdp/pkg/pdpattributes" + "sync" ) +var ( + shutdownFlag bool + mu sync.Mutex +) + +// SetShutdownFlag sets the shutdown flag +func SetShutdownFlag() { + mu.Lock() + shutdownFlag = true + mu.Unlock() +} + +// IsShutdown checks if the consumer has already been shut down +func IsShutdown() bool { + mu.Lock() + defer mu.Unlock() + return shutdownFlag +} + type OpaPdpMessage struct { Name string `json:"name"` // Name of the PDP (optional for broadcast messages). MessageType string `json:"MessageName"` // Type of the message (e.g., PDP_UPDATE, PDP_STATE_CHANGE, etc.) @@ -74,58 +96,65 @@ func checkIfMessageIsForOpaPdp(message OpaPdpMessage) bool { // Handles incoming Kafka messages, validates their relevance to the current PDP, // and dispatches them for further processing based on their type. -func PdpMessageHandler(kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error { +func PdpMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error { log.Debug("Starting PDP Message Listener.....") var stopConsuming bool for !stopConsuming { - message, err := kafkacomm.ReadKafkaMessages(kc) - if err != nil { - log.Warnf("Failed to Read Kafka Messages: %v\n", err) - continue - } - log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message)) - - if message != nil { - - var opaPdpMessage OpaPdpMessage - - err = json.Unmarshal(message, &opaPdpMessage) + select { + case <-ctx.Done(): + log.Debug("Stopping PDP Listener.....") + return nil + stopConsuming = true ///Loop Exits + default: + message, err := kafkacomm.ReadKafkaMessages(kc) if err != nil { - log.Warnf("Failed to UnMarshal Messages: %v\n", err) continue } + log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message)) - if !checkIfMessageIsForOpaPdp(opaPdpMessage) { - - log.Warnf("Not a valid Opa Pdp Message") - continue - } + if message != nil { - switch opaPdpMessage.MessageType { + var opaPdpMessage OpaPdpMessage - case "PDP_UPDATE": - err = PdpUpdateMessageHandler(message, p) + err = json.Unmarshal(message, &opaPdpMessage) if err != nil { - log.Warnf("Error processing Update Message: %v", err) + log.Warnf("Failed to UnMarshal Messages: %v\n", err) + continue } - case "PDP_STATE_CHANGE": - err = PdpStateChangeMessageHandler(message, p) - if err != nil { - log.Warnf("Error processing Update Message: %v", err) + if !checkIfMessageIsForOpaPdp(opaPdpMessage) { + + log.Warnf("Not a valid Opa Pdp Message") + continue } - case "PDP_STATUS": - log.Debugf("discarding event of type PDP_STATUS") - continue - default: - log.Errorf("This is not a valid Message Type: %s", opaPdpMessage.MessageType) - continue + switch opaPdpMessage.MessageType { - } + case "PDP_UPDATE": + err = PdpUpdateMessageHandler(message, p) + if err != nil { + log.Warnf("Error processing Update Message: %v", err) + } + case "PDP_STATE_CHANGE": + err = PdpStateChangeMessageHandler(message, p) + if err != nil { + log.Warnf("Error processing Update Message: %v", err) + } + + case "PDP_STATUS": + log.Debugf("discarding event of type PDP_STATUS") + continue + default: + log.Errorf("This is not a valid Message Type: %s", opaPdpMessage.MessageType) + continue + + } + + } } + } return nil diff --git a/pkg/kafkacomm/handler/pdp_message_handler_test.go b/pkg/kafkacomm/handler/pdp_message_handler_test.go index 3764c9e..8ba1e0e 100644 --- a/pkg/kafkacomm/handler/pdp_message_handler_test.go +++ b/pkg/kafkacomm/handler/pdp_message_handler_test.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // @@ -20,8 +21,13 @@ package handler import ( "github.com/stretchr/testify/assert" + "policy-opa-pdp/consts" "policy-opa-pdp/pkg/pdpattributes" "testing" + // "context" + // "encoding/json" + // "errors" + // "policy-opa-pdp/pkg/kafkacomm/mocks" ) /* @@ -36,7 +42,7 @@ func TestCheckIfMessageIsForOpaPdp_Check(t *testing.T) { opapdpMessage.Name = "opa-3a318049-813f-4172-b4d3-7d4f466e5b80" opapdpMessage.MessageType = "PDP_STATUS" - opapdpMessage.PdpGroup = "defaultGroup" + opapdpMessage.PdpGroup = "opaGroup" opapdpMessage.PdpSubgroup = "opa" assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Its a valid Opa Pdp Message") @@ -55,7 +61,7 @@ func TestCheckIfMessageIsForOpaPdp_Check_Message_Name(t *testing.T) { opapdpMessage.Name = "" opapdpMessage.MessageType = "PDP_STATUS" - opapdpMessage.PdpGroup = "defaultGroup" + opapdpMessage.PdpGroup = "opaGroup" opapdpMessage.PdpSubgroup = "opa" assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Not a valid Opa Pdp Message") @@ -74,7 +80,7 @@ func TestCheckIfMessageIsForOpaPdp_Check_PdpGroup(t *testing.T) { opapdpMessage.Name = "" opapdpMessage.MessageType = "PDP_STATUS" - opapdpMessage.PdpGroup = "defaultGroup" + opapdpMessage.PdpGroup = "opaGroup" opapdpMessage.PdpSubgroup = "opa" pdpattributes.PdpSubgroup = "opa" @@ -113,7 +119,7 @@ func TestCheckIfMessageIsForOpaPdp_Check_PdpSubgroup(t *testing.T) { opapdpMessage.Name = "" opapdpMessage.MessageType = "PDP_STATUS" - opapdpMessage.PdpGroup = "defaultGroup" + opapdpMessage.PdpGroup = "opaGroup" opapdpMessage.PdpSubgroup = "opa" pdpattributes.PdpSubgroup = "opa" @@ -133,10 +139,53 @@ func TestCheckIfMessageIsForOpaPdp_Check_IncorrectPdpSubgroup(t *testing.T) { opapdpMessage.Name = "" opapdpMessage.MessageType = "PDP_STATUS" - opapdpMessage.PdpGroup = "defaultGroup" + opapdpMessage.PdpGroup = "opaGroup" opapdpMessage.PdpSubgroup = "o" pdpattributes.PdpSubgroup = "opa" assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Not a valid Opa Pdp Message") } + +func TestCheckIfMessageIsForOpaPdp_EmptyPdpSubgroupAndGroup(t *testing.T) { + var opapdpMessage OpaPdpMessage + opapdpMessage.Name = "" + opapdpMessage.MessageType = "PDP_STATUS" + opapdpMessage.PdpGroup = "" + opapdpMessage.PdpSubgroup = "" + + assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Message should be invalid when PdpGroup and PdpSubgroup are empty") +} + +func TestCheckIfMessageIsForOpaPdp_ValidBroadcastMessage(t *testing.T) { + var opapdpMessage OpaPdpMessage + opapdpMessage.Name = "" + opapdpMessage.MessageType = "PDP_UPDATE" + opapdpMessage.PdpGroup = "opaGroup" + opapdpMessage.PdpSubgroup = "" + + pdpattributes.PdpSubgroup = "opa" + consts.PdpGroup = "opaGroup" + + assert.True(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Valid broadcast message should pass the check") +} + +func TestCheckIfMessageIsForOpaPdp_InvalidGroupMismatch(t *testing.T) { + var opapdpMessage OpaPdpMessage + opapdpMessage.Name = "" + opapdpMessage.MessageType = "PDP_STATUS" + opapdpMessage.PdpGroup = "wrongGroup" + opapdpMessage.PdpSubgroup = "" + + consts.PdpGroup = "opaGroup" + + assert.False(t, checkIfMessageIsForOpaPdp(opapdpMessage), "Message with mismatched PdpGroup should fail") +} + +// Test SetShutdownFlag and IsShutdown +func TestSetAndCheckShutdownFlag(t *testing.T) { + assert.False(t, IsShutdown(), "Shutdown flag should be false initially") + + SetShutdownFlag() + assert.True(t, IsShutdown(), "Shutdown flag should be true after calling SetShutdownFlag") +} diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler.go b/pkg/kafkacomm/handler/pdp_state_change_handler.go index 32d998f..2de89ff 100644 --- a/pkg/kafkacomm/handler/pdp_state_change_handler.go +++ b/pkg/kafkacomm/handler/pdp_state_change_handler.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // will process the state change message from pap and send the pdp status response. diff --git a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go index f7e8f84..67edd6f 100644 --- a/pkg/kafkacomm/handler/pdp_state_change_handler_test.go +++ b/pkg/kafkacomm/handler/pdp_state_change_handler_test.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler.go b/pkg/kafkacomm/handler/pdp_update_message_handler.go index 632bcc8..efe115c 100644 --- a/pkg/kafkacomm/handler/pdp_update_message_handler.go +++ b/pkg/kafkacomm/handler/pdp_update_message_handler.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // will process the update message from pap and send the pdp status response. diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go index 061f1ce..4d5d7dc 100644 --- a/pkg/kafkacomm/handler/pdp_update_message_handler_test.go +++ b/pkg/kafkacomm/handler/pdp_update_message_handler_test.go @@ -1,6 +1,5 @@ -// - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // @@ -43,7 +43,7 @@ func TestPdpUpdateMessageHandler_Success(t *testing.T) { "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1", "timestampMs":1730722305297, "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059", - "pdpGroup":"defaultGroup", + "pdpGroup":"opaGroup", "pdpSubgroup":"opa" }` @@ -154,7 +154,7 @@ func TestPdpUpdateMessageHandler_Fails_Sending_UpdateResponse(t *testing.T) { "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1", "timestampMs":1730722305297, "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059", - "pdpGroup":"defaultGroup" + "pdpGroup":"opaGroup" }` mockSender := new(mocks.PdpStatusSender) @@ -183,7 +183,7 @@ func TestPdpUpdateMessageHandler_Invalid_Starttimeinterval(t *testing.T) { "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1", "timestampMs":1730722305297, "name":"opa-21cabb3e-f652-4ca6-b498-a77e62fcd059", - "pdpGroup":"defaultGroup", + "pdpGroup":"opaGroup", "pdpSubgroup":"opa" }` diff --git a/pkg/kafkacomm/pdp_topic_consumer.go b/pkg/kafkacomm/pdp_topic_consumer.go index 4858bdf..3d19e6c 100644 --- a/pkg/kafkacomm/pdp_topic_consumer.go +++ b/pkg/kafkacomm/pdp_topic_consumer.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // kafkacomm package provides a structured way to create and manage Kafka consumers, @@ -20,12 +21,20 @@ package kafkacomm import ( + "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "policy-opa-pdp/cfg" "policy-opa-pdp/pkg/log" + "sync" "time" ) +var ( + // Declare a global variable to hold the singleton KafkaConsumer + consumerInstance *KafkaConsumer + consumerOnce sync.Once // sync.Once ensures that the consumer is created only once +) + // KafkaConsumerInterface defines the interface for a Kafka consumer. type KafkaConsumerInterface interface { Close() error @@ -40,63 +49,92 @@ type KafkaConsumer struct { // Close closes the KafkaConsumer func (kc *KafkaConsumer) Close() { - kc.Consumer.Close() + if kc.Consumer != nil { + kc.Consumer.Close() + } } // Unsubscribe unsubscribes the KafkaConsumer func (kc *KafkaConsumer) Unsubscribe() error { - if err := kc.Consumer.Unsubscribe(); err != nil { - log.Warnf("Error Unsubscribing :%v", err) + if kc.Consumer == nil { + return fmt.Errorf("Kafka Consumer is nil so cannot Unsubscribe") + } + err := kc.Consumer.Unsubscribe() + if err != nil { + log.Warnf("Error Unsubscribing: %v", err) return err } - log.Debug("Unsubscribe From Topic") + log.Debug("Unsubscribed From Topic") return nil } -// creates a new Kafka consumer and returns it +// NewKafkaConsumer creates a new Kafka consumer and returns it func NewKafkaConsumer() (*KafkaConsumer, error) { - brokers := cfg.BootstrapServer - groupid := cfg.GroupId - topic := cfg.Topic - useSASL := cfg.UseSASLForKAFKA - username := cfg.KAFKA_USERNAME - password := cfg.KAFKA_PASSWORD + // Initialize the consumer instance only once + consumerOnce.Do(func() { + log.Debugf("Creating Kafka Consumer singleton instance") + brokers := cfg.BootstrapServer + groupid := cfg.GroupId + topic := cfg.Topic + useSASL := cfg.UseSASLForKAFKA + username := cfg.KAFKA_USERNAME + password := cfg.KAFKA_PASSWORD - // Add Kafka Connection Properties .... - configMap := &kafka.ConfigMap{ - "bootstrap.servers": brokers, - "group.id": groupid, - "auto.offset.reset": "earliest", - } - //for STRIMZI-KAFKA in case sasl is enabled - if useSASL == "true" { - configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") - configMap.SetKey("sasl.username", username) - configMap.SetKey("sasl.password", password) - configMap.SetKey("security.protocol", "SASL_PLAINTEXT") - } + // Add Kafka connection properties + configMap := &kafka.ConfigMap{ + "bootstrap.servers": brokers, + "group.id": groupid, + "auto.offset.reset": "latest", + } - // create new Kafka Consumer - consumer, err := kafka.NewConsumer(configMap) - if err != nil { - log.Warnf("Error creating consumer: %v\n", err) - return nil, err - } - //subscribe to topic - err = consumer.SubscribeTopics([]string{topic}, nil) - if err != nil { - log.Warnf("Error subcribing to topic: %v\n", err) - return nil, err + // If SASL is enabled, add SASL properties + if useSASL == "true" { + configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") + configMap.SetKey("sasl.username", username) + configMap.SetKey("sasl.password", password) + configMap.SetKey("security.protocol", "SASL_PLAINTEXT") + configMap.SetKey("session.timeout.ms", "30000") + configMap.SetKey("max.poll.interval.ms", "300000") + configMap.SetKey("enable.partition.eof", true) + configMap.SetKey("enable.auto.commit", true) + // configMap.SetKey("debug", "all") // Uncomment for debug + } + + // Create a new Kafka consumer + consumer, err := kafka.NewConsumer(configMap) + if err != nil { + log.Warnf("Error creating consumer: %v", err) + return + } + if consumer == nil { + log.Warnf("Kafka Consumer is nil after creation") + return + } + + // Subscribe to the topic + err = consumer.SubscribeTopics([]string{topic}, nil) + if err != nil { + log.Warnf("Error subscribing to topic: %v", err) + return + } + log.Debugf("Topic Subscribed: %v", topic) + + // Assign the consumer instance + consumerInstance = &KafkaConsumer{Consumer: consumer} + log.Debugf("Created SIngleton consumer instance") + }) + + // Return the singleton consumer instance + if consumerInstance == nil { + return nil, fmt.Errorf("Kafka Consumer instance not created") } - log.Debugf("Topic Subscribed... : %v", topic) - return &KafkaConsumer{Consumer: consumer}, nil + return consumerInstance, nil } -// gets the Kafka messages on the subscribed topic +// ReadKafkaMessages gets the Kafka messages on the subscribed topic func ReadKafkaMessages(kc *KafkaConsumer) ([]byte, error) { - msg, err := kc.Consumer.ReadMessage(-1) + msg, err := kc.Consumer.ReadMessage(100 * time.Millisecond) if err != nil { - log.Warnf("Error reading Kafka message: %v", err) return nil, err } return msg.Value, nil diff --git a/pkg/kafkacomm/pdp_topic_consumer_test.go b/pkg/kafkacomm/pdp_topic_consumer_test.go index 2fdfa90..9feeeaa 100644 --- a/pkg/kafkacomm/pdp_topic_consumer_test.go +++ b/pkg/kafkacomm/pdp_topic_consumer_test.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // diff --git a/pkg/kafkacomm/pdp_topic_producer.go b/pkg/kafkacomm/pdp_topic_producer.go index 1b11b35..d8edb0b 100644 --- a/pkg/kafkacomm/pdp_topic_producer.go +++ b/pkg/kafkacomm/pdp_topic_producer.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // Package kafkacomm provides utilities for producing messages to a Kafka topic @@ -22,9 +23,9 @@ package kafkacomm import ( "github.com/confluentinc/confluent-kafka-go/kafka" + "log" "policy-opa-pdp/cfg" "sync" - "log" ) type KafkaProducerInterface interface { diff --git a/pkg/kafkacomm/pdp_topic_producer_test.go b/pkg/kafkacomm/pdp_topic_producer_test.go index 55f3bc8..3379845 100644 --- a/pkg/kafkacomm/pdp_topic_producer_test.go +++ b/pkg/kafkacomm/pdp_topic_producer_test.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat.go b/pkg/kafkacomm/publisher/pdp-heartbeat.go index f814992..fbd07d6 100644 --- a/pkg/kafkacomm/publisher/pdp-heartbeat.go +++ b/pkg/kafkacomm/publisher/pdp-heartbeat.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // The publisher package is responsible for managing periodic heartbeat messages for the @@ -55,7 +56,7 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) { if ticker != nil { ticker.Stop() } - // StopTicker() + // StopTicker() currentInterval = intervalMs ticker = time.NewTicker(time.Duration(intervalMs) * time.Millisecond) diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go index f03b0eb..e95866e 100644 --- a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go +++ b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration.go b/pkg/kafkacomm/publisher/pdp-pap-registration.go index 75f22d6..54b12ea 100644 --- a/pkg/kafkacomm/publisher/pdp-pap-registration.go +++ b/pkg/kafkacomm/publisher/pdp-pap-registration.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // allows to send the pdp registartion message with unique transaction id and timestamp to topic diff --git a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go index 03749de..725b4b9 100644 --- a/pkg/kafkacomm/publisher/pdp-pap-registration_test.go +++ b/pkg/kafkacomm/publisher/pdp-pap-registration_test.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher.go b/pkg/kafkacomm/publisher/pdp-status-publisher.go index 756d0f2..4a13b1c 100644 --- a/pkg/kafkacomm/publisher/pdp-status-publisher.go +++ b/pkg/kafkacomm/publisher/pdp-status-publisher.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go index 5e02704..83154ca 100644 --- a/pkg/kafkacomm/publisher/pdp-status-publisher_test.go +++ b/pkg/kafkacomm/publisher/pdp-status-publisher_test.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telecom +// Copyright (C) 2024: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // ========================LICENSE_END=================================== // |