aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/kafkacomm/pdp_topic_consumer.go
blob: 3d19e6c925ace288cf6a50540886b663acc08f45 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// -
//   ========================LICENSE_START=================================
//   Copyright (C) 2024: Deutsche Telekom
//
//   Licensed under the Apache License, Version 2.0 (the "License");
//   you may not use this file except in compliance with the License.
//   You may obtain a copy of the License at
//
//        http://www.apache.org/licenses/LICENSE-2.0
//
//   Unless required by applicable law or agreed to in writing, software
//   distributed under the License is distributed on an "AS IS" BASIS,
//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//   See the License for the specific language governing permissions and
//   limitations under the License.
//   SPDX-License-Identifier: Apache-2.0
//   ========================LICENSE_END===================================

// kafkacomm package provides a structured way to create and manage Kafka consumers,
// handle subscriptions, and read messages from Kafka topics
package kafkacomm

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"policy-opa-pdp/cfg"
	"policy-opa-pdp/pkg/log"
	"sync"
	"time"
)

var (
	// Declare a global variable to hold the singleton KafkaConsumer
	consumerInstance *KafkaConsumer
	consumerOnce     sync.Once // sync.Once ensures that the consumer is created only once
)

// KafkaConsumerInterface defines the interface for a Kafka consumer.
type KafkaConsumerInterface interface {
	Close() error
	Unsubscribe() error
	ReadMessage(timeout time.Duration) (*kafka.Message, error)
}

// KafkaConsumer is a wrapper around the Kafka consumer.
type KafkaConsumer struct {
	Consumer KafkaConsumerInterface
}

// Close closes the KafkaConsumer
func (kc *KafkaConsumer) Close() {
	if kc.Consumer != nil {
		kc.Consumer.Close()
	}
}

// Unsubscribe unsubscribes the KafkaConsumer
func (kc *KafkaConsumer) Unsubscribe() error {
	if kc.Consumer == nil {
		return fmt.Errorf("Kafka Consumer is nil so cannot Unsubscribe")
	}
	err := kc.Consumer.Unsubscribe()
	if err != nil {
		log.Warnf("Error Unsubscribing: %v", err)
		return err
	}
	log.Debug("Unsubscribed From Topic")
	return nil
}

// NewKafkaConsumer creates a new Kafka consumer and returns it
func NewKafkaConsumer() (*KafkaConsumer, error) {
	// Initialize the consumer instance only once
	consumerOnce.Do(func() {
		log.Debugf("Creating Kafka Consumer singleton instance")
		brokers := cfg.BootstrapServer
		groupid := cfg.GroupId
		topic := cfg.Topic
		useSASL := cfg.UseSASLForKAFKA
		username := cfg.KAFKA_USERNAME
		password := cfg.KAFKA_PASSWORD

		// Add Kafka connection properties
		configMap := &kafka.ConfigMap{
			"bootstrap.servers": brokers,
			"group.id":          groupid,
			"auto.offset.reset": "latest",
		}

		// If SASL is enabled, add SASL properties
		if useSASL == "true" {
			configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512")
			configMap.SetKey("sasl.username", username)
			configMap.SetKey("sasl.password", password)
			configMap.SetKey("security.protocol", "SASL_PLAINTEXT")
			configMap.SetKey("session.timeout.ms", "30000")
			configMap.SetKey("max.poll.interval.ms", "300000")
			configMap.SetKey("enable.partition.eof", true)
			configMap.SetKey("enable.auto.commit", true)
			// configMap.SetKey("debug", "all") // Uncomment for debug
		}

		// Create a new Kafka consumer
		consumer, err := kafka.NewConsumer(configMap)
		if err != nil {
			log.Warnf("Error creating consumer: %v", err)
			return
		}
		if consumer == nil {
			log.Warnf("Kafka Consumer is nil after creation")
			return
		}

		// Subscribe to the topic
		err = consumer.SubscribeTopics([]string{topic}, nil)
		if err != nil {
			log.Warnf("Error subscribing to topic: %v", err)
			return
		}
		log.Debugf("Topic Subscribed: %v", topic)

		// Assign the consumer instance
		consumerInstance = &KafkaConsumer{Consumer: consumer}
		log.Debugf("Created SIngleton consumer instance")
	})

	// Return the singleton consumer instance
	if consumerInstance == nil {
		return nil, fmt.Errorf("Kafka Consumer instance not created")
	}
	return consumerInstance, nil
}

// ReadKafkaMessages gets the Kafka messages on the subscribed topic
func ReadKafkaMessages(kc *KafkaConsumer) ([]byte, error) {
	msg, err := kc.Consumer.ReadMessage(100 * time.Millisecond)
	if err != nil {
		return nil, err
	}
	return msg.Value, nil
}