diff options
author | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2022-07-01 07:15:00 +0100 |
---|---|---|
committer | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2022-07-22 20:17:46 +0100 |
commit | 2a2b5d085876480c1b0d9470a57c6cab4f51008c (patch) | |
tree | 64bd2ded017ad43bd995686e2e7b8f1ffa412a40 /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal | |
parent | 15ede09d6ef4b52db05fab534eed7192991b1f98 (diff) |
Introduce Custom Kafka End point
Issue-ID: POLICY-4133
Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech>
Change-Id: I2745f3af97e9bb83d94c5cb6d29dfd452d315506
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal')
4 files changed, 252 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 20f4c91c..8d88b0d9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -5,6 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,13 +29,19 @@ import com.att.nsa.cambria.client.CambriaConsumer; import java.io.IOException; import java.net.MalformedURLException; import java.security.GeneralSecurityException; +import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder; @@ -219,6 +226,53 @@ public interface BusConsumer { } /** + * Kafka based consumer. + */ + public static class KafkaConsumerWrapper extends FetchingBusConsumer { + + /** + * logger. + */ + private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); + + /** + * Kafka consumer. + */ + private KafkaConsumer<String, String> consumer; + + /** + * Kafka Consumer Wrapper. + * BusTopicParam object contains the following parameters + * servers messaging bus hosts. + * topic topic + * + * @param busTopicParams - The parameters for the bus topic + * @throws GeneralSecurityException - Security exception + * @throws MalformedURLException - Malformed URL exception + */ + public KafkaConsumerWrapper(BusTopicParams busTopicParams) { + super(busTopicParams); + } + + @Override + public Iterable<String> fetch() throws IOException { + // TODO: Not implemented yet + return new ArrayList<>(); + } + + @Override + public void close() { + super.close(); + this.consumer.close(); + } + + @Override + public String toString() { + return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; + } + } + + /** * MR based consumer. */ public abstract class DmaapConsumerWrapper extends FetchingBusConsumer { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 8bf805bf..e0df7095 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -5,6 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,8 +32,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.record.CompressionType; import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; import org.onap.dmaap.mr.client.response.MRPublisherResponse; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; @@ -144,6 +150,58 @@ public interface BusPublisher { } /** + * Kafka based library publisher. + */ + public static class KafkaPublisherWrapper implements BusPublisher { + + private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); + + /** + * The actual Kafka publisher. + */ + private final KafkaProducer producer; + + /** + * Constructor. + * + * @param busTopicParams topic parameters + */ + public KafkaPublisherWrapper(BusTopicParams busTopicParams) { + // TODO Setting of topic parameters is not implemented yet. + //Setup Properties for Kafka Producer + Properties kafkaProps = new Properties(); + this.producer = new KafkaProducer(kafkaProps); + } + + @Override + public boolean send(String partitionId, String message) { + if (message == null) { + throw new IllegalArgumentException("No message provided"); + } + // TODO Sending messages is not implemented yet + return true; + } + + @Override + public void close() { + logger.info("{}: CLOSE", this); + + try (this.producer) { + this.producer.close(); + } catch (Exception e) { + logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); + } + } + + + @Override + public String toString() { + return "KafkaPublisherWrapper []"; + } + + } + + /** * DmaapClient library wrapper. */ public abstract class DmaapPublisherWrapper implements BusPublisher { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java new file mode 100644 index 00000000..b564229b --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java @@ -0,0 +1,76 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation publishes events for the associated KAFKA topic, inline with the calling + * thread. + */ +public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTopicSink { + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class); + + /** + * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned + * attributes. + * + * <p>servers list of KAFKA servers available for publishing + * topic the topic to publish to + * partitionId the partition key (optional, autogenerated if not provided) + * useHttps does connection use HTTPS? + * @param busTopicParams contains attributes needed + * @throws IllegalArgumentException if invalid arguments are detected + */ + public InlineKafkaTopicSink(BusTopicParams busTopicParams) { + super(busTopicParams); + } + + /** + * Instantiation of internal resources. + */ + @Override + public void init() { + + this.publisher = new BusPublisher.KafkaPublisherWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.effectiveTopic) + .useHttps(this.useHttps) + .build()); + logger.info("{}: KAFKA SINK created", this); + } + + @Override + public String toString() { + return "InlineKafkaTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + + super.toString() + "]"; + } + + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.KAFKA; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java new file mode 100644 index 00000000..b8362b83 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; + +/** + * This topic source implementation specializes in reading messages over an Kafka Bus topic source and + * notifying its listeners. + */ +public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource { + + /** + * Constructor. + * + * @param busTopicParams Parameters object containing all the required inputs + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) { + super(busTopicParams); + this.init(); + } + + /** + * Initialize the Cambria client. + */ + @Override + public void init() { + this.consumer = new BusConsumer.KafkaConsumerWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.effectiveTopic) + .useHttps(this.useHttps) + .build()); + } + + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.KAFKA; + } + + @Override + public String toString() { + return "SingleThreadedKafkaTopicSource [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + + ", toString()=" + super.toString() + "]"; + } + +} |