diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | 564 |
1 files changed, 143 insertions, 421 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 0f31bf7d..b46c2715 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -2,8 +2,10 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,22 +23,31 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor; import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; -import java.util.Map; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; -import org.onap.dmaap.mr.client.MRClientFactory; -import org.onap.dmaap.mr.client.impl.MRConsumerImpl; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,126 +71,54 @@ public interface BusConsumer { public void close(); /** - * BusConsumer that supports server-side filtering. + * Consumer that handles fetch() failures by sleeping. */ - public interface FilterableBusConsumer extends BusConsumer { + abstract class FetchingBusConsumer implements BusConsumer { + private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class); /** - * Sets the server-side filter. - * - * @param filter new filter value, or {@code null} - * @throws IllegalArgumentException if the consumer cannot be built with the new filter - */ - public void setFilter(String filter); - } - - /** - * Cambria based consumer. - */ - public static class CambriaConsumerWrapper implements FilterableBusConsumer { - - /** - * logger. - */ - private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); - - /** - * Used to build the consumer. + * Fetch timeout. */ - private final ConsumerBuilder builder; - - /** - * Locked while updating {@link #consumer} and {@link #newConsumer}. - */ - private final Object consLocker = new Object(); - - /** - * Cambria client. - */ - private CambriaConsumer consumer; + protected int fetchTimeout; /** - * Cambria client to use for next fetch. + * Time to sleep on a fetch failure. */ - private CambriaConsumer newConsumer = null; + @Getter + private final int sleepTime; /** - * fetch timeout. + * Counted down when {@link #close()} is invoked. */ - protected int fetchTimeout; + private final CountDownLatch closeCondition = new CountDownLatch(1); - /** - * close condition. - */ - protected CountDownLatch closeCondition = new CountDownLatch(1); /** - * Cambria Consumer Wrapper. - * BusTopicParam object contains the following parameters - * servers messaging bus hosts. - * topic topic - * apiKey API Key - * apiSecret API Secret - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit + * Constructs the object. * - * @param busTopicParams - The parameters for the bus topic - * @throws GeneralSecurityException - Security exception - * @throws MalformedURLException - Malformed URL exception + * @param busTopicParams parameters for the bus topic */ - public CambriaConsumerWrapper(BusTopicParams busTopicParams) { - + protected FetchingBusConsumer(BusTopicParams busTopicParams) { this.fetchTimeout = busTopicParams.getFetchTimeout(); - this.builder = new CambriaClientBuilders.ConsumerBuilder(); - - builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance()) - .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) - .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); - - // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(fetchTimeout + 30000); - - if (busTopicParams.isUseHttps()) { - builder.usingHttps(); - - if (busTopicParams.isAllowSelfSignedCerts()) { - builder.allowSelfSignedCertificates(); - } - } - - if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { - builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - } - - if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { - builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); - } - - try { - this.consumer = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Iterable<String> fetch() throws IOException { - try { - return getCurrentConsumer().fetch(); - } catch (final IOException e) { //NOSONAR - logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), - this.fetchTimeout); - sleepAfterFetchFailure(); - throw e; + if (this.fetchTimeout <= 0) { + this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH; + } else { + // don't sleep too long, even if fetch timeout is large + this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); } } - private void sleepAfterFetchFailure() { + /** + * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed, + * or the thread is interrupted, then this will return immediately. + */ + protected void sleepAfterFetchFailure() { try { - this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR + logger.info("{}: backoff for {}ms", this, sleepTime); + if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) { + logger.info("{}: closed while handling fetch error", this); + } } catch (InterruptedException e) { logger.warn("{}: interrupted while handling fetch error", this, e); @@ -190,365 +129,148 @@ public interface BusConsumer { @Override public void close() { this.closeCondition.countDown(); - getCurrentConsumer().close(); - } - - private CambriaConsumer getCurrentConsumer() { - CambriaConsumer old = null; - CambriaConsumer ret; - - synchronized (consLocker) { - if (this.newConsumer != null) { - // replace old consumer with new consumer - old = this.consumer; - this.consumer = this.newConsumer; - this.newConsumer = null; - } - - ret = this.consumer; - } - - if (old != null) { - old.close(); - } - - return ret; - } - - @Override - public void setFilter(String filter) { - logger.info("{}: setting DMAAP server-side filter: {}", this, filter); - builder.withServerSideFilter(filter); - - try { - CambriaConsumer previous; - synchronized (consLocker) { - previous = this.newConsumer; - this.newConsumer = builder.build(); - } - - if (previous != null) { - // there was already a new consumer - close it - previous.close(); - } - - } catch (MalformedURLException | GeneralSecurityException e) { - /* - * Since an exception occurred, "consumer" still has its old value, thus it should - * not be closed at this point. - */ - throw new IllegalArgumentException(e); - } - } - - @Override - public String toString() { - return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; } } /** - * MR based consumer. + * Kafka based consumer. */ - public abstract class DmaapConsumerWrapper implements BusConsumer { + class KafkaConsumerWrapper extends FetchingBusConsumer { /** * logger. */ - private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); - - /** - * Name of the "protocol" property. - */ - protected static final String PROTOCOL_PROP = "Protocol"; + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); - /** - * fetch timeout. - */ - protected int fetchTimeout; + private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; /** - * close condition. + * Kafka consumer. */ - protected CountDownLatch closeCondition = new CountDownLatch(1); + protected KafkaConsumer<String, String> consumer; + protected Properties kafkaProps; - /** - * MR Consumer. - */ - protected MRConsumerImpl consumer; + protected boolean allowTracing; /** - * MR Consumer Wrapper. + * Kafka Consumer Wrapper. + * BusTopicParam - object contains the following parameters + * servers - messaging bus hosts. + * topic - topic * - * <p>servers messaging bus hosts - * topic topic - * apiKey API Key - * apiSecret API Secret - * username AAF Login - * password AAF Password - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit - * - * @param busTopicParams contains above listed attributes - * @throws MalformedURLException URL should be valid + * @param busTopicParams - The parameters for the bus topic */ - public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - this.fetchTimeout = busTopicParams.getFetchTimeout(); + public KafkaConsumerWrapper(BusTopicParams busTopicParams) { + super(busTopicParams); if (busTopicParams.isTopicInvalid()) { - throw new IllegalArgumentException("No topic for DMaaP"); + throw new IllegalArgumentException("No topic for Kafka"); } - this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(), - busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(), - busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null, - busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - - this.consumer.setUsername(busTopicParams.getUserName()); - this.consumer.setPassword(busTopicParams.getPassword()); - } - - @Override - public Iterable<String> fetch() throws IOException { - final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); - if (response == null) { - logger.warn("{}: DMaaP NULL response received", this); - - sleepAfterFetchFailure(); - return new ArrayList<>(); - } else { - logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), - response.getResponseMessage()); - - if (!"200".equals(response.getResponseCode())) { - - logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); + //Setup Properties for consumer + kafkaProps = new Properties(); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + busTopicParams.getServers().get(0)); - sleepAfterFetchFailure(); - - /* fall through */ - } + if (busTopicParams.isAdditionalPropsValid()) { + kafkaProps.putAll(busTopicParams.getAdditionalProps()); } - if (response.getActualMessages() == null) { - return new ArrayList<>(); - } else { - return response.getActualMessages(); + if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); } - } - - private void sleepAfterFetchFailure() { - try { - this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR - - } catch (InterruptedException e) { - logger.warn("{}: interrupted while handling fetch error", this, e); - Thread.currentThread().interrupt(); + if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); } - } - - @Override - public void close() { - this.closeCondition.countDown(); - this.consumer.close(); - } - - @Override - public String toString() { - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; - } - } - - /** - * MR based consumer. - */ - public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); - - private final Properties props; - - /** - * BusTopicParams contain the following parameters. - * MR Consumer Wrapper. - * - * <p>servers messaging bus hosts - * topic topic - * apiKey API Key - * apiSecret API Secret - * aafLogin AAF Login - * aafPassword AAF Password - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit - * - * @param busTopicParams contains above listed params - * @throws MalformedURLException URL should be valid - */ - public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - super(busTopicParams); - - // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if (busTopicParams.isServersInvalid()) { - throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup()); } - - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - props = new Properties(); - - if (busTopicParams.isUseHttps()) { - props.setProperty(PROTOCOL_PROP, "https"); - this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904"); + if (busTopicParams.isAllowTracing()) { + this.allowTracing = true; + kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingConsumerInterceptor.class.getName()); } - this.consumer.setProps(props); - logger.info("{}: CREATION", this); + consumer = new KafkaConsumer<>(kafkaProps); + //Subscribe to the topic + consumer.subscribe(List.of(busTopicParams.getTopic())); } @Override - public String toString() { - final MRConsumerImpl consumer = this.consumer; - - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; - } - } - - public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); - - private final Properties props; - - /** - * Constructor. - * - * @param busTopicParams topic paramters - * - * @throws MalformedURLException must provide a valid URL - */ - public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - - super(busTopicParams); - - - final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get( - PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) - : null); - - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + public Iterable<String> fetch() { + ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout)); + if (records == null || records.count() <= 0) { + return Collections.emptyList(); } + List<String> messages = new ArrayList<>(records.count()); + try { + if (allowTracing) { + createParentTraceContext(records); + } - if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + for (TopicPartition partition : records.partitions()) { + List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); + for (ConsumerRecord<String, String> partitionRecord : partitionRecords) { + messages.add(partitionRecord.value()); + } + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } catch (Exception e) { + logger.error("{}: cannot fetch, throwing exception after sleep...", this); + sleepAfterFetchFailure(); + throw e; } + return messages; + } - final String serviceName = busTopicParams.getServers().get(0); - - this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - this.consumer.setUsername(busTopicParams.getUserName()); - this.consumer.setPassword(busTopicParams.getPassword()); - - props = new Properties(); - - props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); - - props.setProperty("username", busTopicParams.getUserName()); - props.setProperty("password", busTopicParams.getPassword()); - - /* These are required, no defaults */ - props.setProperty("topic", busTopicParams.getTopic()); - - props.setProperty("Environment", busTopicParams.getEnvironment()); - props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); + private void createParentTraceContext(ConsumerRecords<String, String> records) { + TraceParentInfo traceParentInfo = new TraceParentInfo(); + for (ConsumerRecord<String, String> consumerRecord : records) { - if (busTopicParams.getPartner() != null) { - props.setProperty("Partner", busTopicParams.getPartner()); + Headers consumerRecordHeaders = consumerRecord.headers(); + traceParentInfo = processTraceParentHeader(consumerRecordHeaders); } - if (dme2RouteOffer != null) { - props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", busTopicParams.getLatitude()); - props.setProperty("Longitude", busTopicParams.getLongitude()); - - /* These are optional, will default to these values if not set in additionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "GET"); + SpanContext spanContext = SpanContext.createFromRemoteParent( + traceParentInfo.getTraceId(), traceParentInfo.getSpanId(), + TraceFlags.getSampled(), TraceState.builder().build()); - if (busTopicParams.isUseHttps()) { - props.setProperty(PROTOCOL_PROP, "https"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - } + Context.current().with(Span.wrap(spanContext)).makeCurrent(); + } - props.setProperty("contenttype", "application/json"); + private TraceParentInfo processTraceParentHeader(Headers headers) { + TraceParentInfo traceParentInfo = new TraceParentInfo(); + if (headers.lastHeader("traceparent") != null) { + traceParentInfo.setParentTraceId(new String(headers.lastHeader( + "traceparent").value(), StandardCharsets.UTF_8)); - if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } + String[] parts = traceParentInfo.getParentTraceId().split("-"); + traceParentInfo.setTraceId(parts[1]); + traceParentInfo.setSpanId(parts[2]); } - MRClientFactory.prop = props; - this.consumer.setProps(props); + return traceParentInfo; + } - logger.info("{}: CREATION", this); + @Data + @NoArgsConstructor + private static class TraceParentInfo { + private String parentTraceId; + private String traceId; + private String spanId; } - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + propnm + " property for DME2 in DMaaP"); + @Override + public void close() { + super.close(); + this.consumer.close(); + logger.info("Kafka Consumer exited {}", this); + } + @Override + public String toString() { + return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; } } } |