aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java564
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java354
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java48
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java59
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java75
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java130
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java (renamed from policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java)43
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java72
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java143
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java (renamed from policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java)48
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java60
11 files changed, 364 insertions, 1232 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 0f31bf7d..b46c2715 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -2,8 +2,10 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
+ * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2022-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,22 +23,31 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.TraceFlags;
+import io.opentelemetry.api.trace.TraceState;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
-import java.util.Map;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,126 +71,54 @@ public interface BusConsumer {
public void close();
/**
- * BusConsumer that supports server-side filtering.
+ * Consumer that handles fetch() failures by sleeping.
*/
- public interface FilterableBusConsumer extends BusConsumer {
+ abstract class FetchingBusConsumer implements BusConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
/**
- * Sets the server-side filter.
- *
- * @param filter new filter value, or {@code null}
- * @throws IllegalArgumentException if the consumer cannot be built with the new filter
- */
- public void setFilter(String filter);
- }
-
- /**
- * Cambria based consumer.
- */
- public static class CambriaConsumerWrapper implements FilterableBusConsumer {
-
- /**
- * logger.
- */
- private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
-
- /**
- * Used to build the consumer.
+ * Fetch timeout.
*/
- private final ConsumerBuilder builder;
-
- /**
- * Locked while updating {@link #consumer} and {@link #newConsumer}.
- */
- private final Object consLocker = new Object();
-
- /**
- * Cambria client.
- */
- private CambriaConsumer consumer;
+ protected int fetchTimeout;
/**
- * Cambria client to use for next fetch.
+ * Time to sleep on a fetch failure.
*/
- private CambriaConsumer newConsumer = null;
+ @Getter
+ private final int sleepTime;
/**
- * fetch timeout.
+ * Counted down when {@link #close()} is invoked.
*/
- protected int fetchTimeout;
+ private final CountDownLatch closeCondition = new CountDownLatch(1);
- /**
- * close condition.
- */
- protected CountDownLatch closeCondition = new CountDownLatch(1);
/**
- * Cambria Consumer Wrapper.
- * BusTopicParam object contains the following parameters
- * servers messaging bus hosts.
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
+ * Constructs the object.
*
- * @param busTopicParams - The parameters for the bus topic
- * @throws GeneralSecurityException - Security exception
- * @throws MalformedURLException - Malformed URL exception
+ * @param busTopicParams parameters for the bus topic
*/
- public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
-
+ protected FetchingBusConsumer(BusTopicParams busTopicParams) {
this.fetchTimeout = busTopicParams.getFetchTimeout();
- this.builder = new CambriaClientBuilders.ConsumerBuilder();
-
- builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
- .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
- .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
-
- // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
- builder.withSocketTimeout(fetchTimeout + 30000);
-
- if (busTopicParams.isUseHttps()) {
- builder.usingHttps();
-
- if (busTopicParams.isAllowSelfSignedCerts()) {
- builder.allowSelfSignedCertificates();
- }
- }
-
- if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
- builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
- }
-
- if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
- builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
- }
-
- try {
- this.consumer = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public Iterable<String> fetch() throws IOException {
- try {
- return getCurrentConsumer().fetch();
- } catch (final IOException e) { //NOSONAR
- logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
- this.fetchTimeout);
- sleepAfterFetchFailure();
- throw e;
+ if (this.fetchTimeout <= 0) {
+ this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
+ } else {
+ // don't sleep too long, even if fetch timeout is large
+ this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
}
}
- private void sleepAfterFetchFailure() {
+ /**
+ * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
+ * or the thread is interrupted, then this will return immediately.
+ */
+ protected void sleepAfterFetchFailure() {
try {
- this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
+ logger.info("{}: backoff for {}ms", this, sleepTime);
+ if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
+ logger.info("{}: closed while handling fetch error", this);
+ }
} catch (InterruptedException e) {
logger.warn("{}: interrupted while handling fetch error", this, e);
@@ -190,365 +129,148 @@ public interface BusConsumer {
@Override
public void close() {
this.closeCondition.countDown();
- getCurrentConsumer().close();
- }
-
- private CambriaConsumer getCurrentConsumer() {
- CambriaConsumer old = null;
- CambriaConsumer ret;
-
- synchronized (consLocker) {
- if (this.newConsumer != null) {
- // replace old consumer with new consumer
- old = this.consumer;
- this.consumer = this.newConsumer;
- this.newConsumer = null;
- }
-
- ret = this.consumer;
- }
-
- if (old != null) {
- old.close();
- }
-
- return ret;
- }
-
- @Override
- public void setFilter(String filter) {
- logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
- builder.withServerSideFilter(filter);
-
- try {
- CambriaConsumer previous;
- synchronized (consLocker) {
- previous = this.newConsumer;
- this.newConsumer = builder.build();
- }
-
- if (previous != null) {
- // there was already a new consumer - close it
- previous.close();
- }
-
- } catch (MalformedURLException | GeneralSecurityException e) {
- /*
- * Since an exception occurred, "consumer" still has its old value, thus it should
- * not be closed at this point.
- */
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public String toString() {
- return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
}
}
/**
- * MR based consumer.
+ * Kafka based consumer.
*/
- public abstract class DmaapConsumerWrapper implements BusConsumer {
+ class KafkaConsumerWrapper extends FetchingBusConsumer {
/**
* logger.
*/
- private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
-
- /**
- * Name of the "protocol" property.
- */
- protected static final String PROTOCOL_PROP = "Protocol";
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
- /**
- * fetch timeout.
- */
- protected int fetchTimeout;
+ private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
/**
- * close condition.
+ * Kafka consumer.
*/
- protected CountDownLatch closeCondition = new CountDownLatch(1);
+ protected KafkaConsumer<String, String> consumer;
+ protected Properties kafkaProps;
- /**
- * MR Consumer.
- */
- protected MRConsumerImpl consumer;
+ protected boolean allowTracing;
/**
- * MR Consumer Wrapper.
+ * Kafka Consumer Wrapper.
+ * BusTopicParam - object contains the following parameters
+ * servers - messaging bus hosts.
+ * topic - topic
*
- * <p>servers messaging bus hosts
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * username AAF Login
- * password AAF Password
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
- *
- * @param busTopicParams contains above listed attributes
- * @throws MalformedURLException URL should be valid
+ * @param busTopicParams - The parameters for the bus topic
*/
- public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
- this.fetchTimeout = busTopicParams.getFetchTimeout();
+ public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
+ super(busTopicParams);
if (busTopicParams.isTopicInvalid()) {
- throw new IllegalArgumentException("No topic for DMaaP");
+ throw new IllegalArgumentException("No topic for Kafka");
}
- this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(),
- busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(),
- busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null,
- busTopicParams.getApiKey(), busTopicParams.getApiSecret());
-
- this.consumer.setUsername(busTopicParams.getUserName());
- this.consumer.setPassword(busTopicParams.getPassword());
- }
-
- @Override
- public Iterable<String> fetch() throws IOException {
- final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
- if (response == null) {
- logger.warn("{}: DMaaP NULL response received", this);
-
- sleepAfterFetchFailure();
- return new ArrayList<>();
- } else {
- logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
- response.getResponseMessage());
-
- if (!"200".equals(response.getResponseCode())) {
-
- logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
- response.getResponseMessage());
+ //Setup Properties for consumer
+ kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ busTopicParams.getServers().get(0));
- sleepAfterFetchFailure();
-
- /* fall through */
- }
+ if (busTopicParams.isAdditionalPropsValid()) {
+ kafkaProps.putAll(busTopicParams.getAdditionalProps());
}
- if (response.getActualMessages() == null) {
- return new ArrayList<>();
- } else {
- return response.getActualMessages();
+ if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
}
- }
-
- private void sleepAfterFetchFailure() {
- try {
- this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
-
- } catch (InterruptedException e) {
- logger.warn("{}: interrupted while handling fetch error", this, e);
- Thread.currentThread().interrupt();
+ if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
}
- }
-
- @Override
- public void close() {
- this.closeCondition.countDown();
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
- }
- }
-
- /**
- * MR based consumer.
- */
- public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
-
- private final Properties props;
-
- /**
- * BusTopicParams contain the following parameters.
- * MR Consumer Wrapper.
- *
- * <p>servers messaging bus hosts
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * aafLogin AAF Login
- * aafPassword AAF Password
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
- *
- * @param busTopicParams contains above listed params
- * @throws MalformedURLException URL should be valid
- */
- public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
- super(busTopicParams);
-
- // super constructor sets servers = {""} if empty to avoid errors when using DME2
- if (busTopicParams.isServersInvalid()) {
- throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
+ if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
}
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- props = new Properties();
-
- if (busTopicParams.isUseHttps()) {
- props.setProperty(PROTOCOL_PROP, "https");
- this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
+ if (busTopicParams.isAllowTracing()) {
+ this.allowTracing = true;
+ kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ TracingConsumerInterceptor.class.getName());
}
- this.consumer.setProps(props);
- logger.info("{}: CREATION", this);
+ consumer = new KafkaConsumer<>(kafkaProps);
+ //Subscribe to the topic
+ consumer.subscribe(List.of(busTopicParams.getTopic()));
}
@Override
- public String toString() {
- final MRConsumerImpl consumer = this.consumer;
-
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
- }
- }
-
- public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
-
- private final Properties props;
-
- /**
- * Constructor.
- *
- * @param busTopicParams topic paramters
- *
- * @throws MalformedURLException must provide a valid URL
- */
- public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
-
- super(busTopicParams);
-
-
- final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(
- PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
- : null);
-
- if (busTopicParams.isEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isAftEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isLatitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (busTopicParams.isLongitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+ public Iterable<String> fetch() {
+ ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
+ if (records == null || records.count() <= 0) {
+ return Collections.emptyList();
}
+ List<String> messages = new ArrayList<>(records.count());
+ try {
+ if (allowTracing) {
+ createParentTraceContext(records);
+ }
- if ((busTopicParams.isPartnerInvalid())
- && StringUtils.isBlank(dme2RouteOffer)) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ for (TopicPartition partition : records.partitions()) {
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
+ messages.add(partitionRecord.value());
+ }
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ } catch (Exception e) {
+ logger.error("{}: cannot fetch, throwing exception after sleep...", this);
+ sleepAfterFetchFailure();
+ throw e;
}
+ return messages;
+ }
- final String serviceName = busTopicParams.getServers().get(0);
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
-
- this.consumer.setUsername(busTopicParams.getUserName());
- this.consumer.setPassword(busTopicParams.getPassword());
-
- props = new Properties();
-
- props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- props.setProperty("username", busTopicParams.getUserName());
- props.setProperty("password", busTopicParams.getPassword());
-
- /* These are required, no defaults */
- props.setProperty("topic", busTopicParams.getTopic());
-
- props.setProperty("Environment", busTopicParams.getEnvironment());
- props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
+ private void createParentTraceContext(ConsumerRecords<String, String> records) {
+ TraceParentInfo traceParentInfo = new TraceParentInfo();
+ for (ConsumerRecord<String, String> consumerRecord : records) {
- if (busTopicParams.getPartner() != null) {
- props.setProperty("Partner", busTopicParams.getPartner());
+ Headers consumerRecordHeaders = consumerRecord.headers();
+ traceParentInfo = processTraceParentHeader(consumerRecordHeaders);
}
- if (dme2RouteOffer != null) {
- props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
-
- props.setProperty("Latitude", busTopicParams.getLatitude());
- props.setProperty("Longitude", busTopicParams.getLongitude());
-
- /* These are optional, will default to these values if not set in additionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
- /* These should not change */
- props.setProperty("TransportType", "DME2");
- props.setProperty("MethodType", "GET");
+ SpanContext spanContext = SpanContext.createFromRemoteParent(
+ traceParentInfo.getTraceId(), traceParentInfo.getSpanId(),
+ TraceFlags.getSampled(), TraceState.builder().build());
- if (busTopicParams.isUseHttps()) {
- props.setProperty(PROTOCOL_PROP, "https");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- }
+ Context.current().with(Span.wrap(spanContext)).makeCurrent();
+ }
- props.setProperty("contenttype", "application/json");
+ private TraceParentInfo processTraceParentHeader(Headers headers) {
+ TraceParentInfo traceParentInfo = new TraceParentInfo();
+ if (headers.lastHeader("traceparent") != null) {
+ traceParentInfo.setParentTraceId(new String(headers.lastHeader(
+ "traceparent").value(), StandardCharsets.UTF_8));
- if (busTopicParams.isAdditionalPropsValid()) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- props.put(entry.getKey(), entry.getValue());
- }
+ String[] parts = traceParentInfo.getParentTraceId().split("-");
+ traceParentInfo.setTraceId(parts[1]);
+ traceParentInfo.setSpanId(parts[2]);
}
- MRClientFactory.prop = props;
- this.consumer.setProps(props);
+ return traceParentInfo;
+ }
- logger.info("{}: CREATION", this);
+ @Data
+ @NoArgsConstructor
+ private static class TraceParentInfo {
+ private String parentTraceId;
+ private String traceId;
+ private String spanId;
}
- private IllegalArgumentException parmException(String topic, String propnm) {
- return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + propnm + " property for DME2 in DMaaP");
+ @Override
+ public void close() {
+ super.close();
+ this.consumer.close();
+ logger.info("Kafka Consumer exited {}", this);
+ }
+ @Override
+ public String toString() {
+ return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
}
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index 469794c7..1b57e48e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -2,8 +2,10 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
+ * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2022-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,29 +23,22 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
+import java.util.UUID;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface BusPublisher {
+ String NO_MESSAGE_PROVIDED = "No message provided";
+ String LOG_CLOSE = "{}: CLOSE";
+ String LOG_CLOSE_FAILED = "{}: CLOSE FAILED";
+
/**
* sends a message.
*
@@ -52,73 +47,76 @@ public interface BusPublisher {
* @return true if success, false otherwise
* @throws IllegalArgumentException if no message provided
*/
- public boolean send(String partitionId, String message);
+ boolean send(String partitionId, String message);
/**
* closes the publisher.
*/
- public void close();
+ void close();
/**
- * Cambria based library publisher.
+ * Kafka based library publisher.
*/
- public static class CambriaPublisherWrapper implements BusPublisher {
+ class KafkaPublisherWrapper implements BusPublisher {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
+ private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
- private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
+ private final String topic;
/**
- * The actual Cambria publisher.
+ * Kafka publisher.
*/
- @JsonIgnore
- @GsonJsonIgnore
- protected CambriaBatchingPublisher publisher;
+ private final Producer<String, String> producer;
+ protected Properties kafkaProps;
/**
- * Constructor.
+ * Kafka Publisher Wrapper.
*
* @param busTopicParams topic parameters
*/
- public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
-
- PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
-
- builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
+ protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
- // Set read timeout to 30 seconds (TBD: this should be configurable)
- builder.withSocketTimeout(30000);
-
- if (busTopicParams.isUseHttps()) {
- if (busTopicParams.isAllowSelfSignedCerts()) {
- builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
- } else {
- builder.withConnectionType(ConnectionType.HTTPS);
- }
+ if (busTopicParams.isTopicInvalid()) {
+ throw new IllegalArgumentException("No topic for Kafka");
}
+ this.topic = busTopicParams.getTopic();
- if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
- builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
+ // Setup Properties for consumer
+ kafkaProps = new Properties();
+ kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
+ if (busTopicParams.isAdditionalPropsValid()) {
+ kafkaProps.putAll(busTopicParams.getAdditionalProps());
}
-
- if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
- builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
+ if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+ }
+ if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
}
- try {
- this.publisher = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
+ if (busTopicParams.isAllowTracing()) {
+ kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ TracingProducerInterceptor.class.getName());
}
+
+ producer = new KafkaProducer<>(kafkaProps);
}
@Override
public boolean send(String partitionId, String message) {
if (message == null) {
- throw new IllegalArgumentException("No message provided");
+ throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
}
try {
- this.publisher.send(partitionId, message);
+ // Create the record
+ ProducerRecord<String, String> producerRecord =
+ new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
+
+ this.producer.send(producerRecord);
+ producer.flush();
} catch (Exception e) {
logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
return false;
@@ -128,263 +126,19 @@ public interface BusPublisher {
@Override
public void close() {
- logger.info("{}: CLOSE", this);
-
- try {
- this.publisher.close();
- } catch (Exception e) {
- logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
- }
- }
-
-
- @Override
- public String toString() {
- return "CambriaPublisherWrapper []";
- }
-
- }
-
- /**
- * DmaapClient library wrapper.
- */
- public abstract class DmaapPublisherWrapper implements BusPublisher {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
-
- /**
- * MR based Publisher.
- */
- protected MRSimplerBatchPublisher publisher;
- protected Properties props;
-
- /**
- * MR Publisher Wrapper.
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param username AAF or DME2 Login
- * @param password AAF or DME2 Password
- */
- public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
- String username, String password, boolean useHttps) {
-
-
- if (StringUtils.isBlank(topic)) {
- throw new IllegalArgumentException("No topic for DMaaP");
- }
-
-
- configureProtocol(topic, protocol, servers, useHttps);
-
- this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
-
- this.publisher.setUsername(username);
- this.publisher.setPassword(password);
-
- props = new Properties();
-
- props.setProperty("Protocol", (useHttps ? "https" : "http"));
- props.setProperty("contenttype", "application/json");
- props.setProperty("username", username);
- props.setProperty("password", password);
-
- props.setProperty("topic", topic);
-
- this.publisher.setProps(props);
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH) {
- this.publisher.setHost(servers.get(0));
- }
-
- logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
- }
-
- private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
- boolean useHttps) {
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH) {
- if (servers == null || servers.isEmpty()) {
- throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
- }
-
- ArrayList<String> dmaapServers = new ArrayList<>();
- String port = useHttps ? ":3905" : ":3904";
- for (String server : servers) {
- dmaapServers.add(server + port);
- }
-
-
- this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
-
- this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- } else if (protocol == ProtocolTypeConstants.DME2) {
- ArrayList<String> dmaapServers = new ArrayList<>();
- dmaapServers.add("0.0.0.0:3904");
-
- this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
-
- this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
-
- } else {
- throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
- }
- }
-
- @Override
- public void close() {
- logger.info("{}: CLOSE", this);
+ logger.info(LOG_CLOSE, this);
try {
- this.publisher.close(1, TimeUnit.SECONDS);
+ this.producer.close();
} catch (Exception e) {
logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
}
}
@Override
- public boolean send(String partitionId, String message) {
- if (message == null) {
- throw new IllegalArgumentException("No message provided");
- }
-
- this.publisher.setPubResponse(new MRPublisherResponse());
- this.publisher.send(partitionId, message);
- MRPublisherResponse response = this.publisher.sendBatchWithResponse();
- if (response != null) {
- logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
- response.getResponseMessage());
- }
-
- return true;
- }
-
- @Override
public String toString() {
- return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
- + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
- + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
- + ", publisher.getUsername()=" + publisher.getUsername() + "]";
+ return "KafkaPublisherWrapper []";
}
- }
-
- /**
- * DmaapClient library wrapper.
- */
- public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
- /**
- * MR based Publisher.
- */
- public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
- boolean useHttps) {
-
- super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
- }
- }
-
- public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
-
- /**
- * Constructor.
- *
- * @param busTopicParams topic parameters
- */
- public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
-
- super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(),
- busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
-
- String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(
- PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
- : null;
- validateParams(busTopicParams, dme2RouteOffer);
-
- String serviceName = busTopicParams.getServers().get(0);
-
- /* These are required, no defaults */
- props.setProperty("Environment", busTopicParams.getEnvironment());
- props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
-
- props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- if (busTopicParams.getPartner() != null) {
- props.setProperty("Partner", busTopicParams.getPartner());
- }
- if (dme2RouteOffer != null) {
- props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
-
- props.setProperty("Latitude", busTopicParams.getLatitude());
- props.setProperty("Longitude", busTopicParams.getLongitude());
-
- // ServiceName also a default, found in additionalProps
-
- /* These are optional, will default to these values if not set in optionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
-
- /* These should not change */
- props.setProperty("TransportType", "DME2");
- props.setProperty("MethodType", "POST");
-
- if (busTopicParams.isAdditionalPropsValid()) {
- addAdditionalProps(busTopicParams);
- }
-
- this.publisher.setProps(props);
- }
-
- private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
- if (busTopicParams.isEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isAftEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isLatitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (busTopicParams.isLongitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
-
- if ((busTopicParams.isPartnerInvalid())
- && StringUtils.isBlank(dme2RouteOffer)) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
- }
-
- private void addAdditionalProps(BusTopicParams busTopicParams) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
-
- if (value != null) {
- props.setProperty(key, value);
- }
- }
- }
-
- private IllegalArgumentException parmException(String topic, String propnm) {
- return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + propnm + " property for DME2 in DMaaP");
-
- }
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java
index ccf25753..f8236d3d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java
@@ -2,14 +2,15 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,11 +21,13 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.bus.ApiKeyEnabled;
/**
* Bus Topic Base.
*/
+@Getter
public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
/**
@@ -43,58 +46,37 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
protected boolean useHttps;
/**
+ * Allow tracing.
+ */
+ protected boolean allowTracing;
+
+ /**
* allow self signed certificates.
*/
protected boolean allowSelfSignedCerts;
/**
* Instantiates a new Bus Topic Base.
- *
+ *
* <p>servers list of servers
* topic topic name
* apiKey API Key
* apiSecret API Secret
* useHttps does connection use HTTPS?
+ * allowTracing Is tracing allowed?
* allowSelfSignedCerts are self-signed certificates allow
* @param busTopicParams holds all our parameters
* @throws IllegalArgumentException if invalid parameters are present
*/
- public BusTopicBase(BusTopicParams busTopicParams) {
+ protected BusTopicBase(BusTopicParams busTopicParams) {
super(busTopicParams.getServers(), busTopicParams.getTopic(), busTopicParams.getEffectiveTopic());
this.apiKey = busTopicParams.getApiKey();
this.apiSecret = busTopicParams.getApiSecret();
this.useHttps = busTopicParams.isUseHttps();
+ this.allowTracing = busTopicParams.isAllowTracing();
this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts();
}
- @Override
- public String getApiKey() {
- return apiKey;
- }
-
- @Override
- public String getApiSecret() {
- return apiSecret;
- }
-
- /**
- * Is using HTTPS.
- *
- * @return if using https
- */
- public boolean isUseHttps() {
- return useHttps;
- }
-
- /**
- * Is self signed certificates allowed.
- *
- * @return if self signed certificates are allowed
- */
- public boolean isAllowSelfSignedCerts() {
- return allowSelfSignedCerts;
- }
-
protected boolean anyNullOrEmpty(String... args) {
for (String arg : args) {
if (arg == null || arg.isEmpty()) {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
index 9df7221f..53a6ab66 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
@@ -3,8 +3,8 @@
* ONAP
* ================================================================================
* Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
- * Modifications Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2019, 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,21 +24,23 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal;
import java.util.List;
import java.util.Map;
+import lombok.AccessLevel;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
/**
* Member variables of this Params class are as follows.
*
- * <p>servers DMaaP servers
- * topic DMaaP Topic to be monitored
- * apiKey DMaaP API Key (optional)
- * apiSecret DMaaP API Secret (optional)
- * consumerGroup DMaaP Reader Consumer Group
- * consumerInstance DMaaP Reader Instance
- * fetchTimeout DMaaP fetch timeout
- * fetchLimit DMaaP fetch limit
+ * <p>servers Kafka servers
+ * topic Kafka Topic to be monitored
+ * apiKey Kafka API Key (optional)
+ * apiSecret Kafka API Secret (optional)
+ * consumerGroup kafka Reader Consumer Group
+ * consumerInstance Kafka Reader Instance
+ * fetchTimeout kafka fetch timeout
+ * fetchLimit Kafka fetch limit
* environment DME2 Environment
* aftEnvironment DME2 AFT Environment
* partner DME2 Partner
@@ -46,6 +48,7 @@ import org.apache.commons.lang3.StringUtils;
* longitude DME2 Longitude
* additionalProps Additional properties to pass to DME2
* useHttps does connection use HTTPS?
+ * allowTracing is message tracing allowed?
* allowSelfSignedCerts are self-signed certificates allow
*/
@Getter
@@ -64,6 +67,7 @@ public class BusTopicParams {
private int fetchTimeout;
private int fetchLimit;
private boolean useHttps;
+ private boolean allowTracing;
private boolean allowSelfSignedCerts;
private boolean managed;
@@ -78,6 +82,7 @@ public class BusTopicParams {
private String clientName;
private String hostname;
private String basePath;
+ @Getter
private String serializationProvider;
public static TopicParamsBuilder builder() {
@@ -165,29 +170,43 @@ public class BusTopicParams {
return additionalProps != null;
}
- public String getSerializationProvider() {
- return serializationProvider;
+ public void setEffectiveTopic(String effectiveTopic) {
+ this.effectiveTopic = topicToLowerCase(effectiveTopic);
}
+ public void setTopic(String topic) {
+ this.topic = topicToLowerCase(topic);
+ }
+
+ public String getEffectiveTopic() {
+ return topicToLowerCase(effectiveTopic);
+ }
+
+ public String getTopic() {
+ return topicToLowerCase(topic);
+ }
+
+ private String topicToLowerCase(String topic) {
+ return (topic == null || topic.isEmpty()) ? topic : topic.toLowerCase();
+ }
+
+ @NoArgsConstructor(access = AccessLevel.PRIVATE)
public static class TopicParamsBuilder {
final BusTopicParams params = new BusTopicParams();
- private TopicParamsBuilder() {
- }
-
public TopicParamsBuilder servers(List<String> servers) {
this.params.servers = servers;
return this;
}
public TopicParamsBuilder topic(String topic) {
- this.params.topic = topic;
+ this.params.setTopic(topic);
return this;
}
public TopicParamsBuilder effectiveTopic(String effectiveTopic) {
- this.params.effectiveTopic = effectiveTopic;
+ this.params.setEffectiveTopic(effectiveTopic);
return this;
}
@@ -226,6 +245,11 @@ public class BusTopicParams {
return this;
}
+ public TopicParamsBuilder allowTracing(boolean allowTracing) {
+ this.params.allowTracing = allowTracing;
+ return this;
+ }
+
public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) {
this.params.allowSelfSignedCerts = allowSelfSignedCerts;
return this;
@@ -309,7 +333,6 @@ public class BusTopicParams {
this.params.serializationProvider = serializationProvider;
return this;
}
-
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
index e94bdffa..9b724072 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
@@ -2,8 +2,10 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
+ * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +24,8 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
import java.util.UUID;
-
+import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
@@ -30,8 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink
- * regardless if it is UEB or DMaaP.
+ * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink.
*
*/
public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
@@ -44,7 +46,9 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
/**
* The partition key to publish to.
*/
- protected String partitionId;
+ @Getter
+ @Setter
+ protected String partitionKey;
/**
* Message bus publisher.
@@ -60,17 +64,18 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
* apiSecret api secret
* partitionId partition id
* useHttps does connection use HTTPS?
+ * allowTracing is tracing allowed?
* allowSelfSignedCerts are self-signed certificates allow *
- * @throws IllegalArgumentException in invalid parameters are passed in
+ * @throws IllegalArgumentException if invalid parameters are passed in
*/
- public InlineBusTopicSink(BusTopicParams busTopicParams) {
+ protected InlineBusTopicSink(BusTopicParams busTopicParams) {
super(busTopicParams);
if (busTopicParams.isPartitionIdInvalid()) {
- this.partitionId = UUID.randomUUID().toString();
+ this.partitionKey = UUID.randomUUID().toString();
} else {
- this.partitionId = busTopicParams.getPartitionId();
+ this.partitionKey = busTopicParams.getPartitionId();
}
}
@@ -84,17 +89,14 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
logger.info("{}: starting", this);
synchronized (this) {
+ if (!this.alive) {
+ if (locked) {
+ throw new IllegalStateException(this + " is locked.");
+ }
- if (this.alive) {
- return true;
- }
-
- if (locked) {
- throw new IllegalStateException(this + " is locked.");
+ this.init();
+ this.alive = true;
}
-
- this.init();
- this.alive = true;
}
return true;
@@ -142,7 +144,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
NetLoggerUtil.log(EventType.OUT, this.getTopicCommInfrastructure(), this.topic, message);
- publisher.send(this.partitionId, message);
+ publisher.send(this.partitionKey, message);
broadcast(message);
} catch (Exception e) {
logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
@@ -153,44 +155,13 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
}
@Override
- public void setPartitionKey(String partitionKey) {
- this.partitionId = partitionKey;
- }
-
- @Override
- public String getPartitionKey() {
- return this.partitionId;
- }
-
- @Override
public void shutdown() {
this.stop();
}
@Override
- protected boolean anyNullOrEmpty(String... args) {
- for (String arg : args) {
- if (arg == null || arg.isEmpty()) {
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- protected boolean allNullOrEmpty(String... args) {
- for (String arg : args) {
- if (!(arg == null || arg.isEmpty())) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override
public String toString() {
- return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]";
+ return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher
+ + "]";
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
deleted file mode 100644
index ba556bb8..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import java.util.Map;
-
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This implementation publishes events for the associated DMAAP topic, inline with the calling
- * thread.
- */
-public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink {
-
- protected static Logger logger = LoggerFactory.getLogger(InlineDmaapTopicSink.class);
-
- protected final String userName;
- protected final String password;
-
- protected String environment = null;
- protected String aftEnvironment = null;
- protected String partner = null;
- protected String latitude = null;
- protected String longitude = null;
-
- protected Map<String, String> additionalProps = null;
-
- /**
- * BusTopicParams contains the below mentioned attributes.
- * servers DMaaP servers
- * topic DMaaP Topic to be monitored
- * apiKey DMaaP API Key (optional)
- * apiSecret DMaaP API Secret (optional)
- * environment DME2 Environment
- * aftEnvironment DME2 AFT Environment
- * partner DME2 Partner
- * latitude DME2 Latitude
- * longitude DME2 Longitude
- * additionalProps Additional properties to pass to DME2
- * useHttps does connection use HTTPS?
- * allowSelfSignedCerts are self-signed certificates allow
- * @param busTopicParams Contains the above mentioned parameters
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public InlineDmaapTopicSink(BusTopicParams busTopicParams) {
-
- super(busTopicParams);
-
- this.userName = busTopicParams.getUserName();
- this.password = busTopicParams.getPassword();
-
- this.environment = busTopicParams.getEnvironment();
- this.aftEnvironment = busTopicParams.getAftEnvironment();
- this.partner = busTopicParams.getPartner();
-
- this.latitude = busTopicParams.getLatitude();
- this.longitude = busTopicParams.getLongitude();
-
- this.additionalProps = busTopicParams.getAdditionalProps();
- }
-
-
- @Override
- public void init() {
- if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .userName(this.userName)
- .password(this.password)
- .useHttps(this.useHttps)
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
- .build());
- } else {
- this.publisher = new BusPublisher.DmaapDmePublisherWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .userName(this.userName)
- .password(this.password)
- .environment(this.environment)
- .aftEnvironment(this.aftEnvironment)
- .partner(this.partner)
- .latitude(this.latitude)
- .longitude(this.longitude)
- .additionalProps(this.additionalProps)
- .useHttps(this.useHttps)
- .build());
- }
-
- logger.info("{}: DMAAP SINK created", this);
- }
-
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.DMAAP;
- }
-
-
- @Override
- public String toString() {
- return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password
- + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + super.toString()
- + "]";
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
index f258d5d9..6354f762 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
@@ -1,9 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,38 +18,39 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import java.util.Map;
import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This implementation publishes events for the associated UEB topic, inline with the calling
+ * This implementation publishes events for the associated KAFKA topic, inline with the calling
* thread.
*/
-public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink {
+public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTopicSink {
/**
* Logger.
*/
- private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class);
+ private static final Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
+
+ protected Map<String, String> additionalProps;
/**
- * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned
+ * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains the below
* attributes.
*
- * <p>servers list of UEB servers available for publishing
+ * <p>servers list of KAFKA servers available for publishing
* topic the topic to publish to
- * apiKey the api key (optional)
- * apiSecret the api secret (optional)
* partitionId the partition key (optional, autogenerated if not provided)
* useHttps does connection use HTTPS?
- * allowSelfSignedCerts are self-signed certificates allow
* @param busTopicParams contains attributes needed
* @throws IllegalArgumentException if invalid arguments are detected
*/
- public InlineUebTopicSink(BusTopicParams busTopicParams) {
+ public InlineKafkaTopicSink(BusTopicParams busTopicParams) {
super(busTopicParams);
+ this.additionalProps = busTopicParams.getAdditionalProps();
}
/**
@@ -61,27 +59,24 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
@Override
public void init() {
- this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder()
+ this.publisher = new BusPublisher.KafkaPublisherWrapper(BusTopicParams.builder()
.servers(this.servers)
.topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
.useHttps(this.useHttps)
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
+ .allowTracing(this.allowTracing)
+ .additionalProps(this.additionalProps)
.build());
- logger.info("{}: UEB SINK created", this);
+ logger.info("{}: KAFKA SINK created", this);
}
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("InlineUebTopicSink [getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
- .append(", toString()=").append(super.toString()).append("]");
- return builder.toString();
+ return "InlineKafkaTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()="
+ + super.toString() + "]";
}
@Override
public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.UEB;
+ return Topic.CommInfrastructure.KAFKA;
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 164f2b16..f98b481f 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -2,8 +2,9 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
+ * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,11 +25,9 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.UUID;
-
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
+import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
@@ -41,32 +40,36 @@ import org.slf4j.LoggerFactory;
* notifying its listeners.
*/
public abstract class SingleThreadedBusTopicSource extends BusTopicBase
- implements Runnable, BusTopicSource, FilterableTopicSource {
+ implements Runnable, BusTopicSource {
/**
* Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
* that in a single file in a concise format.
*/
- private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
+ private static Logger logger = LoggerFactory.getLogger(SingleThreadedBusTopicSource.class);
/**
* Bus consumer group.
*/
+ @Getter
protected final String consumerGroup;
/**
* Bus consumer instance.
*/
+ @Getter
protected final String consumerInstance;
/**
* Bus fetch timeout.
*/
+ @Getter
protected final int fetchTimeout;
/**
* Bus fetch limit.
*/
+ @Getter
protected final int fetchLimit;
/**
@@ -87,19 +90,24 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
*
* @throws IllegalArgumentException An invalid parameter passed in
*/
- public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
+ protected SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
super(busTopicParams);
- if (busTopicParams.isConsumerGroupInvalid()) {
+ if (busTopicParams.isConsumerGroupInvalid() && busTopicParams.isConsumerInstanceInvalid()) {
this.consumerGroup = UUID.randomUUID().toString();
- } else {
+ this.consumerInstance = NetworkUtil.getHostname();
+
+ } else if (busTopicParams.isConsumerGroupInvalid()) {
+ this.consumerGroup = UUID.randomUUID().toString();
+ this.consumerInstance = busTopicParams.getConsumerInstance();
+
+ } else if (busTopicParams.isConsumerInstanceInvalid()) {
this.consumerGroup = busTopicParams.getConsumerGroup();
- }
+ this.consumerInstance = UUID.randomUUID().toString();
- if (busTopicParams.isConsumerInstanceInvalid()) {
- this.consumerInstance = NetworkUtil.getHostname();
} else {
+ this.consumerGroup = busTopicParams.getConsumerGroup();
this.consumerInstance = busTopicParams.getConsumerInstance();
}
@@ -134,8 +142,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
logger.info("{}: register: start not attempted", this);
}
} catch (Exception e) {
- logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
- e);
+ logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e);
}
}
@@ -176,8 +183,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
busPollerThread.start();
return true;
} catch (Exception e) {
- logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
- throw new IllegalStateException(e);
+ throw new IllegalStateException(this + ": cannot start", e);
}
}
}
@@ -227,7 +233,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
try {
fetchAllMessages();
} catch (IOException | RuntimeException e) {
- logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
+ logger.error("{}: cannot fetch", this, e);
}
}
@@ -265,17 +271,6 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
return broadcast(event);
}
-
- @Override
- public void setFilter(String filter) {
- if (consumer instanceof FilterableBusConsumer) {
- ((FilterableBusConsumer) consumer).setFilter(filter);
-
- } else {
- throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
- }
- }
-
@Override
public String toString() {
return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
@@ -285,29 +280,8 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
}
@Override
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- @Override
- public String getConsumerInstance() {
- return consumerInstance;
- }
-
- @Override
public void shutdown() {
this.stop();
this.topicListeners.clear();
}
-
- @Override
- public int getFetchTimeout() {
- return fetchTimeout;
- }
-
- @Override
- public int getFetchLimit() {
- return fetchLimit;
- }
-
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
deleted file mode 100644
index e5d08a2a..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import java.net.MalformedURLException;
-import java.util.Map;
-
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
- * its listeners.
- */
-public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
-
- private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
-
-
- protected final String userName;
- protected final String password;
-
- protected String environment = null;
- protected String aftEnvironment = null;
- protected String partner = null;
- protected String latitude = null;
- protected String longitude = null;
-
- protected Map<String, String> additionalProps = null;
-
-
- /**
- * Constructor.
- *
- * @param busTopicParams Parameters object containing all the required inputs
- *
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
-
- super(busTopicParams);
-
- this.userName = busTopicParams.getUserName();
- this.password = busTopicParams.getPassword();
-
- this.environment = busTopicParams.getEnvironment();
- this.aftEnvironment = busTopicParams.getAftEnvironment();
- this.partner = busTopicParams.getPartner();
-
- this.latitude = busTopicParams.getLatitude();
- this.longitude = busTopicParams.getLongitude();
-
- this.additionalProps = busTopicParams.getAdditionalProps();
- try {
- this.init();
- } catch (Exception e) {
- logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}",
- topic, e.getMessage(), e);
- throw new IllegalArgumentException(e);
- }
- }
-
-
- /**
- * Initialize the Cambria or MR Client.
- */
- @Override
- public void init() throws MalformedURLException {
- BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .consumerGroup(this.consumerGroup)
- .consumerInstance(this.consumerInstance)
- .fetchTimeout(this.fetchTimeout)
- .fetchLimit(this.fetchLimit)
- .useHttps(this.useHttps);
-
- if (anyNullOrEmpty(this.userName, this.password)) {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
- .build());
- } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
- .userName(this.userName)
- .password(this.password)
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
- .build());
- } else {
- this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(builder
- .userName(this.userName)
- .password(this.password)
- .environment(this.environment)
- .aftEnvironment(this.aftEnvironment)
- .partner(this.partner)
- .latitude(this.latitude)
- .longitude(this.longitude)
- .additionalProps(this.additionalProps)
- .build());
- }
-
- logger.info("{}: INITTED", this);
- }
-
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.DMAAP;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
- .append((password == null || password.isEmpty()) ? "-" : password.length())
- .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
- .append(super.toString()).append("]");
- return builder.toString();
- }
-
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
index e210762d..869273f0 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
@@ -1,9 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,14 +18,18 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import java.net.MalformedURLException;
+import java.util.Map;
import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
/**
- * This topic source implementation specializes in reading messages over an UEB Bus topic source and
+ * This topic source implementation specializes in reading messages over a Kafka Bus topic source and
* notifying its listeners.
*/
-public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource {
+public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource {
+
+ protected Map<String, String> additionalProps = null;
/**
* Constructor.
@@ -36,40 +37,43 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i
* @param busTopicParams Parameters object containing all the required inputs
* @throws IllegalArgumentException An invalid parameter passed in
*/
- public SingleThreadedUebTopicSource(BusTopicParams busTopicParams) {
+ public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) {
super(busTopicParams);
- this.init();
+ this.additionalProps = busTopicParams.getAdditionalProps();
+ try {
+ this.init();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("ERROR during init in kafka-source: cannot create topic " + topic, e);
+ }
}
/**
* Initialize the Cambria client.
*/
@Override
- public void init() {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
+ public void init() throws MalformedURLException {
+ BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder()
.servers(this.servers)
.topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .consumerGroup(this.consumerGroup)
- .consumerInstance(this.consumerInstance)
.fetchTimeout(this.fetchTimeout)
- .fetchLimit(this.fetchLimit)
+ .consumerGroup(this.consumerGroup)
.useHttps(this.useHttps)
- .allowSelfSignedCerts(this.allowSelfSignedCerts).build());
+ .allowTracing(this.allowTracing);
+
+ this.consumer = new BusConsumer.KafkaConsumerWrapper(builder
+ .additionalProps(this.additionalProps)
+ .build());
}
@Override
public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.UEB;
+ return Topic.CommInfrastructure.KAFKA;
}
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("SingleThreadedUebTopicSource [getTopicCommInfrastructure()=")
- .append(getTopicCommInfrastructure()).append(", toString()=").append(super.toString()).append("]");
- return builder.toString();
+ return "SingleThreadedKafkaTopicSource [getTopicCommInfrastructure()=" + getTopicCommInfrastructure()
+ + ", toString()=" + super.toString() + "]";
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
index 6f07df1b..c63fbcc2 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
@@ -2,14 +2,16 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,19 +24,21 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal;
import java.util.ArrayList;
import java.util.List;
-
+import lombok.AccessLevel;
+import lombok.Getter;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Getter
public abstract class TopicBase implements Topic {
/**
* Logger.
*/
- private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
+ private static final Logger logger = LoggerFactory.getLogger(TopicBase.class);
/**
* List of servers.
@@ -72,17 +76,18 @@ public abstract class TopicBase implements Topic {
/**
* All my subscribers for new message notifications.
*/
+ @Getter(AccessLevel.NONE)
protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
/**
* Instantiates a new Topic Base.
- *
+ *
* @param servers list of servers
* @param topic topic name
- *
+ *
* @throws IllegalArgumentException if invalid parameters are present
*/
- public TopicBase(List<String> servers, String topic) {
+ protected TopicBase(List<String> servers, String topic) {
this(servers, topic, topic);
}
@@ -94,7 +99,7 @@ public abstract class TopicBase implements Topic {
*
* @throws IllegalArgumentException if invalid parameters are present
*/
- public TopicBase(List<String> servers, String topic, String effectiveTopic) {
+ protected TopicBase(List<String> servers, String topic, String effectiveTopic) {
if (servers == null || servers.isEmpty()) {
throw new IllegalArgumentException("Server(s) must be provided");
@@ -112,8 +117,8 @@ public abstract class TopicBase implements Topic {
}
this.servers = servers;
- this.topic = topic;
- this.effectiveTopic = effectiveTopicCopy;
+ this.topic = topic.toLowerCase();
+ this.effectiveTopic = effectiveTopicCopy.toLowerCase();
}
@Override
@@ -152,14 +157,14 @@ public abstract class TopicBase implements Topic {
/**
* Broadcast event to all listeners.
- *
+ *
* @param message the event
* @return true if all notifications are performed with no error, false otherwise
*/
protected boolean broadcast(String message) {
List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
- boolean success = true;
+ var success = true;
for (TopicListener topicListener : snapshotListeners) {
try {
topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
@@ -173,7 +178,7 @@ public abstract class TopicBase implements Topic {
/**
* Take a snapshot of current topic listeners.
- *
+ *
* @return the topic listeners
*/
protected synchronized List<TopicListener> snapshotTopicListeners() {
@@ -219,33 +224,8 @@ public abstract class TopicBase implements Topic {
}
@Override
- public boolean isLocked() {
- return this.locked;
- }
-
- @Override
- public String getTopic() {
- return topic;
- }
-
- @Override
- public String getEffectiveTopic() {
- return effectiveTopic;
- }
-
- @Override
- public boolean isAlive() {
- return this.alive;
- }
-
- @Override
- public List<String> getServers() {
- return servers;
- }
-
- @Override
public synchronized String[] getRecentEvents() {
- String[] events = new String[recentEvents.size()];
+ var events = new String[recentEvents.size()];
return recentEvents.toArray(events);
}