summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java1
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java62
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java59
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java5
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java24
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java1
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java6
7 files changed, 135 insertions, 23 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
index 47279d47..45a8be3f 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
@@ -65,6 +65,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
}
var kafkaTopicSource = makeSource(busTopicParams);
+
kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource);
return kafkaTopicSource;
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 8d88b0d9..ee41150f 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -32,6 +32,8 @@ import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -42,6 +44,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
@@ -235,10 +239,13 @@ public interface BusConsumer {
*/
private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
+ private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+
/**
* Kafka consumer.
*/
- private KafkaConsumer<String, String> consumer;
+ protected KafkaConsumer<String, String> consumer;
+ protected Properties kafkaProps;
/**
* Kafka Consumer Wrapper.
@@ -250,20 +257,67 @@ public interface BusConsumer {
* @throws GeneralSecurityException - Security exception
* @throws MalformedURLException - Malformed URL exception
*/
- public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
+ public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
super(busTopicParams);
+
+ if (busTopicParams.isTopicInvalid()) {
+ throw new IllegalArgumentException("No topic for Kafka");
+ }
+
+ //Setup Properties for consumer
+ kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ busTopicParams.getServers().get(0));
+
+ if (busTopicParams.isAdditionalPropsValid()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+ kafkaProps.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+ }
+ if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+ }
+ if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
+ }
+ consumer = new KafkaConsumer<>(kafkaProps);
+ //Subscribe to the topic
+ consumer.subscribe(Arrays.asList(busTopicParams.getTopic()));
}
@Override
public Iterable<String> fetch() throws IOException {
- // TODO: Not implemented yet
- return new ArrayList<>();
+ ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
+ if (records == null || records.count() <= 0) {
+ return Collections.emptyList();
+ }
+ List<String> messages = new ArrayList<>(records.count());
+ try {
+ for (TopicPartition partition : records.partitions()) {
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> record : partitionRecords) {
+ messages.add(record.value());
+ }
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ } catch (Exception e) {
+ logger.error("{}: cannot fetch because of {}", this, e.getMessage());
+ sleepAfterFetchFailure();
+ throw e;
+ }
+ return messages;
}
@Override
public void close() {
super.close();
this.consumer.close();
+ logger.info("Kafka Consumer exited {}", this);
}
@Override
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index e0df7095..fe9bab23 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -32,13 +32,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.record.CompressionType;
import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
import org.onap.dmaap.mr.client.response.MRPublisherResponse;
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
@@ -155,22 +156,45 @@ public interface BusPublisher {
public static class KafkaPublisherWrapper implements BusPublisher {
private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
+ private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+
+ private String topic;
/**
- * The actual Kafka publisher.
+ * Kafka publisher.
*/
- private final KafkaProducer producer;
+ private Producer<String, String> producer;
+ protected Properties kafkaProps;
/**
- * Constructor.
+ * Kafka Publisher Wrapper.
*
* @param busTopicParams topic parameters
*/
- public KafkaPublisherWrapper(BusTopicParams busTopicParams) {
- // TODO Setting of topic parameters is not implemented yet.
- //Setup Properties for Kafka Producer
- Properties kafkaProps = new Properties();
- this.producer = new KafkaProducer(kafkaProps);
+ protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
+
+ if (busTopicParams.isTopicInvalid()) {
+ throw new IllegalArgumentException("No topic for Kafka");
+ }
+
+ this.topic = busTopicParams.getTopic();
+
+ //Setup Properties for consumer
+ kafkaProps = new Properties();
+ kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
+ if (busTopicParams.isAdditionalPropsValid()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+ kafkaProps.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+ }
+ if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+ }
+
+ producer = new KafkaProducer<>(kafkaProps);
}
@Override
@@ -178,7 +202,18 @@ public interface BusPublisher {
if (message == null) {
throw new IllegalArgumentException("No message provided");
}
- // TODO Sending messages is not implemented yet
+
+ try {
+ //Create the record
+ ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
+ UUID.randomUUID().toString(), message);
+
+ this.producer.send(record);
+ producer.flush();
+ } catch (Exception e) {
+ logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
+ return false;
+ }
return true;
}
@@ -186,7 +221,7 @@ public interface BusPublisher {
public void close() {
logger.info("{}: CLOSE", this);
- try (this.producer) {
+ try {
this.producer.close();
} catch (Exception e) {
logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
index b564229b..6574d408 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
@@ -18,6 +18,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import java.util.Map;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
import org.slf4j.Logger;
@@ -34,6 +35,8 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
*/
private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
+ protected Map<String, String> additionalProps = null;
+
/**
* Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned
* attributes.
@@ -47,6 +50,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
*/
public InlineKafkaTopicSink(BusTopicParams busTopicParams) {
super(busTopicParams);
+ this.additionalProps = busTopicParams.getAdditionalProps();
}
/**
@@ -59,6 +63,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
.servers(this.servers)
.topic(this.effectiveTopic)
.useHttps(this.useHttps)
+ .additionalProps(this.additionalProps)
.build());
logger.info("{}: KAFKA SINK created", this);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
index b8362b83..2a651ee7 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
@@ -18,6 +18,8 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import java.net.MalformedURLException;
+import java.util.Map;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
@@ -27,6 +29,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
*/
public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource {
+ protected Map<String, String> additionalProps = null;
+
/**
* Constructor.
*
@@ -35,19 +39,29 @@ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource
*/
public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) {
super(busTopicParams);
- this.init();
+ this.additionalProps = busTopicParams.getAdditionalProps();
+ try {
+ this.init();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("ERROR during init in kafka-source: cannot create topic " + topic, e);
+ }
}
/**
* Initialize the Cambria client.
*/
@Override
- public void init() {
- this.consumer = new BusConsumer.KafkaConsumerWrapper(BusTopicParams.builder()
+ public void init() throws MalformedURLException {
+ BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder()
.servers(this.servers)
.topic(this.effectiveTopic)
- .useHttps(this.useHttps)
- .build());
+ .fetchTimeout(this.fetchTimeout)
+ .consumerGroup(this.consumerGroup)
+ .useHttps(this.useHttps);
+
+ this.consumer = new BusConsumer.KafkaConsumerWrapper(builder
+ .additionalProps(this.additionalProps)
+ .build());
}
@Override
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
index 49dff287..46a6c398 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
@@ -129,7 +129,6 @@ public final class PolicyEndPointProperties {
public static final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
public static final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
-
/* Topic Sink Values */
/**
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
index 3e62f98f..113a4bd1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
@@ -24,9 +24,13 @@ package org.onap.policy.common.endpoints.utils;
import com.google.re2j.Pattern;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
@@ -47,7 +51,7 @@ public class KafkaPropertyUtils {
public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) {
final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers)));
- //TODO More Kafka properties to be added
+
return BusTopicParams.builder()
.servers(serverList)
.topic(topic)