aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java564
1 files changed, 143 insertions, 421 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 0f31bf7d..b46c2715 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -2,8 +2,10 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
+ * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2022-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,22 +23,31 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.TraceFlags;
+import io.opentelemetry.api.trace.TraceState;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
-import java.util.Map;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,126 +71,54 @@ public interface BusConsumer {
public void close();
/**
- * BusConsumer that supports server-side filtering.
+ * Consumer that handles fetch() failures by sleeping.
*/
- public interface FilterableBusConsumer extends BusConsumer {
+ abstract class FetchingBusConsumer implements BusConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
/**
- * Sets the server-side filter.
- *
- * @param filter new filter value, or {@code null}
- * @throws IllegalArgumentException if the consumer cannot be built with the new filter
- */
- public void setFilter(String filter);
- }
-
- /**
- * Cambria based consumer.
- */
- public static class CambriaConsumerWrapper implements FilterableBusConsumer {
-
- /**
- * logger.
- */
- private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
-
- /**
- * Used to build the consumer.
+ * Fetch timeout.
*/
- private final ConsumerBuilder builder;
-
- /**
- * Locked while updating {@link #consumer} and {@link #newConsumer}.
- */
- private final Object consLocker = new Object();
-
- /**
- * Cambria client.
- */
- private CambriaConsumer consumer;
+ protected int fetchTimeout;
/**
- * Cambria client to use for next fetch.
+ * Time to sleep on a fetch failure.
*/
- private CambriaConsumer newConsumer = null;
+ @Getter
+ private final int sleepTime;
/**
- * fetch timeout.
+ * Counted down when {@link #close()} is invoked.
*/
- protected int fetchTimeout;
+ private final CountDownLatch closeCondition = new CountDownLatch(1);
- /**
- * close condition.
- */
- protected CountDownLatch closeCondition = new CountDownLatch(1);
/**
- * Cambria Consumer Wrapper.
- * BusTopicParam object contains the following parameters
- * servers messaging bus hosts.
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
+ * Constructs the object.
*
- * @param busTopicParams - The parameters for the bus topic
- * @throws GeneralSecurityException - Security exception
- * @throws MalformedURLException - Malformed URL exception
+ * @param busTopicParams parameters for the bus topic
*/
- public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
-
+ protected FetchingBusConsumer(BusTopicParams busTopicParams) {
this.fetchTimeout = busTopicParams.getFetchTimeout();
- this.builder = new CambriaClientBuilders.ConsumerBuilder();
-
- builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
- .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
- .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
-
- // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
- builder.withSocketTimeout(fetchTimeout + 30000);
-
- if (busTopicParams.isUseHttps()) {
- builder.usingHttps();
-
- if (busTopicParams.isAllowSelfSignedCerts()) {
- builder.allowSelfSignedCertificates();
- }
- }
-
- if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
- builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
- }
-
- if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
- builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
- }
-
- try {
- this.consumer = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public Iterable<String> fetch() throws IOException {
- try {
- return getCurrentConsumer().fetch();
- } catch (final IOException e) { //NOSONAR
- logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
- this.fetchTimeout);
- sleepAfterFetchFailure();
- throw e;
+ if (this.fetchTimeout <= 0) {
+ this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
+ } else {
+ // don't sleep too long, even if fetch timeout is large
+ this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
}
}
- private void sleepAfterFetchFailure() {
+ /**
+ * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
+ * or the thread is interrupted, then this will return immediately.
+ */
+ protected void sleepAfterFetchFailure() {
try {
- this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
+ logger.info("{}: backoff for {}ms", this, sleepTime);
+ if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
+ logger.info("{}: closed while handling fetch error", this);
+ }
} catch (InterruptedException e) {
logger.warn("{}: interrupted while handling fetch error", this, e);
@@ -190,365 +129,148 @@ public interface BusConsumer {
@Override
public void close() {
this.closeCondition.countDown();
- getCurrentConsumer().close();
- }
-
- private CambriaConsumer getCurrentConsumer() {
- CambriaConsumer old = null;
- CambriaConsumer ret;
-
- synchronized (consLocker) {
- if (this.newConsumer != null) {
- // replace old consumer with new consumer
- old = this.consumer;
- this.consumer = this.newConsumer;
- this.newConsumer = null;
- }
-
- ret = this.consumer;
- }
-
- if (old != null) {
- old.close();
- }
-
- return ret;
- }
-
- @Override
- public void setFilter(String filter) {
- logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
- builder.withServerSideFilter(filter);
-
- try {
- CambriaConsumer previous;
- synchronized (consLocker) {
- previous = this.newConsumer;
- this.newConsumer = builder.build();
- }
-
- if (previous != null) {
- // there was already a new consumer - close it
- previous.close();
- }
-
- } catch (MalformedURLException | GeneralSecurityException e) {
- /*
- * Since an exception occurred, "consumer" still has its old value, thus it should
- * not be closed at this point.
- */
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public String toString() {
- return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
}
}
/**
- * MR based consumer.
+ * Kafka based consumer.
*/
- public abstract class DmaapConsumerWrapper implements BusConsumer {
+ class KafkaConsumerWrapper extends FetchingBusConsumer {
/**
* logger.
*/
- private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
-
- /**
- * Name of the "protocol" property.
- */
- protected static final String PROTOCOL_PROP = "Protocol";
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
- /**
- * fetch timeout.
- */
- protected int fetchTimeout;
+ private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
/**
- * close condition.
+ * Kafka consumer.
*/
- protected CountDownLatch closeCondition = new CountDownLatch(1);
+ protected KafkaConsumer<String, String> consumer;
+ protected Properties kafkaProps;
- /**
- * MR Consumer.
- */
- protected MRConsumerImpl consumer;
+ protected boolean allowTracing;
/**
- * MR Consumer Wrapper.
+ * Kafka Consumer Wrapper.
+ * BusTopicParam - object contains the following parameters
+ * servers - messaging bus hosts.
+ * topic - topic
*
- * <p>servers messaging bus hosts
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * username AAF Login
- * password AAF Password
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
- *
- * @param busTopicParams contains above listed attributes
- * @throws MalformedURLException URL should be valid
+ * @param busTopicParams - The parameters for the bus topic
*/
- public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
- this.fetchTimeout = busTopicParams.getFetchTimeout();
+ public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
+ super(busTopicParams);
if (busTopicParams.isTopicInvalid()) {
- throw new IllegalArgumentException("No topic for DMaaP");
+ throw new IllegalArgumentException("No topic for Kafka");
}
- this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(),
- busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(),
- busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null,
- busTopicParams.getApiKey(), busTopicParams.getApiSecret());
-
- this.consumer.setUsername(busTopicParams.getUserName());
- this.consumer.setPassword(busTopicParams.getPassword());
- }
-
- @Override
- public Iterable<String> fetch() throws IOException {
- final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
- if (response == null) {
- logger.warn("{}: DMaaP NULL response received", this);
-
- sleepAfterFetchFailure();
- return new ArrayList<>();
- } else {
- logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
- response.getResponseMessage());
-
- if (!"200".equals(response.getResponseCode())) {
-
- logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
- response.getResponseMessage());
+ //Setup Properties for consumer
+ kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ busTopicParams.getServers().get(0));
- sleepAfterFetchFailure();
-
- /* fall through */
- }
+ if (busTopicParams.isAdditionalPropsValid()) {
+ kafkaProps.putAll(busTopicParams.getAdditionalProps());
}
- if (response.getActualMessages() == null) {
- return new ArrayList<>();
- } else {
- return response.getActualMessages();
+ if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
}
- }
-
- private void sleepAfterFetchFailure() {
- try {
- this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
-
- } catch (InterruptedException e) {
- logger.warn("{}: interrupted while handling fetch error", this, e);
- Thread.currentThread().interrupt();
+ if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
}
- }
-
- @Override
- public void close() {
- this.closeCondition.countDown();
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
- }
- }
-
- /**
- * MR based consumer.
- */
- public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
-
- private final Properties props;
-
- /**
- * BusTopicParams contain the following parameters.
- * MR Consumer Wrapper.
- *
- * <p>servers messaging bus hosts
- * topic topic
- * apiKey API Key
- * apiSecret API Secret
- * aafLogin AAF Login
- * aafPassword AAF Password
- * consumerGroup Consumer Group
- * consumerInstance Consumer Instance
- * fetchTimeout Fetch Timeout
- * fetchLimit Fetch Limit
- *
- * @param busTopicParams contains above listed params
- * @throws MalformedURLException URL should be valid
- */
- public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
- super(busTopicParams);
-
- // super constructor sets servers = {""} if empty to avoid errors when using DME2
- if (busTopicParams.isServersInvalid()) {
- throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
+ if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
}
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- props = new Properties();
-
- if (busTopicParams.isUseHttps()) {
- props.setProperty(PROTOCOL_PROP, "https");
- this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
+ if (busTopicParams.isAllowTracing()) {
+ this.allowTracing = true;
+ kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ TracingConsumerInterceptor.class.getName());
}
- this.consumer.setProps(props);
- logger.info("{}: CREATION", this);
+ consumer = new KafkaConsumer<>(kafkaProps);
+ //Subscribe to the topic
+ consumer.subscribe(List.of(busTopicParams.getTopic()));
}
@Override
- public String toString() {
- final MRConsumerImpl consumer = this.consumer;
-
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
- }
- }
-
- public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
-
- private final Properties props;
-
- /**
- * Constructor.
- *
- * @param busTopicParams topic paramters
- *
- * @throws MalformedURLException must provide a valid URL
- */
- public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
-
- super(busTopicParams);
-
-
- final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(
- PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
- : null);
-
- if (busTopicParams.isEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isAftEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isLatitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (busTopicParams.isLongitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+ public Iterable<String> fetch() {
+ ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
+ if (records == null || records.count() <= 0) {
+ return Collections.emptyList();
}
+ List<String> messages = new ArrayList<>(records.count());
+ try {
+ if (allowTracing) {
+ createParentTraceContext(records);
+ }
- if ((busTopicParams.isPartnerInvalid())
- && StringUtils.isBlank(dme2RouteOffer)) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ for (TopicPartition partition : records.partitions()) {
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
+ messages.add(partitionRecord.value());
+ }
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ } catch (Exception e) {
+ logger.error("{}: cannot fetch, throwing exception after sleep...", this);
+ sleepAfterFetchFailure();
+ throw e;
}
+ return messages;
+ }
- final String serviceName = busTopicParams.getServers().get(0);
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
-
- this.consumer.setUsername(busTopicParams.getUserName());
- this.consumer.setPassword(busTopicParams.getPassword());
-
- props = new Properties();
-
- props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- props.setProperty("username", busTopicParams.getUserName());
- props.setProperty("password", busTopicParams.getPassword());
-
- /* These are required, no defaults */
- props.setProperty("topic", busTopicParams.getTopic());
-
- props.setProperty("Environment", busTopicParams.getEnvironment());
- props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
+ private void createParentTraceContext(ConsumerRecords<String, String> records) {
+ TraceParentInfo traceParentInfo = new TraceParentInfo();
+ for (ConsumerRecord<String, String> consumerRecord : records) {
- if (busTopicParams.getPartner() != null) {
- props.setProperty("Partner", busTopicParams.getPartner());
+ Headers consumerRecordHeaders = consumerRecord.headers();
+ traceParentInfo = processTraceParentHeader(consumerRecordHeaders);
}
- if (dme2RouteOffer != null) {
- props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
-
- props.setProperty("Latitude", busTopicParams.getLatitude());
- props.setProperty("Longitude", busTopicParams.getLongitude());
-
- /* These are optional, will default to these values if not set in additionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
- /* These should not change */
- props.setProperty("TransportType", "DME2");
- props.setProperty("MethodType", "GET");
+ SpanContext spanContext = SpanContext.createFromRemoteParent(
+ traceParentInfo.getTraceId(), traceParentInfo.getSpanId(),
+ TraceFlags.getSampled(), TraceState.builder().build());
- if (busTopicParams.isUseHttps()) {
- props.setProperty(PROTOCOL_PROP, "https");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- }
+ Context.current().with(Span.wrap(spanContext)).makeCurrent();
+ }
- props.setProperty("contenttype", "application/json");
+ private TraceParentInfo processTraceParentHeader(Headers headers) {
+ TraceParentInfo traceParentInfo = new TraceParentInfo();
+ if (headers.lastHeader("traceparent") != null) {
+ traceParentInfo.setParentTraceId(new String(headers.lastHeader(
+ "traceparent").value(), StandardCharsets.UTF_8));
- if (busTopicParams.isAdditionalPropsValid()) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- props.put(entry.getKey(), entry.getValue());
- }
+ String[] parts = traceParentInfo.getParentTraceId().split("-");
+ traceParentInfo.setTraceId(parts[1]);
+ traceParentInfo.setSpanId(parts[2]);
}
- MRClientFactory.prop = props;
- this.consumer.setProps(props);
+ return traceParentInfo;
+ }
- logger.info("{}: CREATION", this);
+ @Data
+ @NoArgsConstructor
+ private static class TraceParentInfo {
+ private String parentTraceId;
+ private String traceId;
+ private String spanId;
}
- private IllegalArgumentException parmException(String topic, String propnm) {
- return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + propnm + " property for DME2 in DMaaP");
+ @Override
+ public void close() {
+ super.close();
+ this.consumer.close();
+ logger.info("Kafka Consumer exited {}", this);
+ }
+ @Override
+ public String toString() {
+ return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
}
}
}