diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal')
11 files changed, 364 insertions, 1232 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 0f31bf7d..b46c2715 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -2,8 +2,10 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,22 +23,31 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor; import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; -import java.util.Map; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; -import org.onap.dmaap.mr.client.MRClientFactory; -import org.onap.dmaap.mr.client.impl.MRConsumerImpl; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,126 +71,54 @@ public interface BusConsumer { public void close(); /** - * BusConsumer that supports server-side filtering. + * Consumer that handles fetch() failures by sleeping. */ - public interface FilterableBusConsumer extends BusConsumer { + abstract class FetchingBusConsumer implements BusConsumer { + private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class); /** - * Sets the server-side filter. - * - * @param filter new filter value, or {@code null} - * @throws IllegalArgumentException if the consumer cannot be built with the new filter - */ - public void setFilter(String filter); - } - - /** - * Cambria based consumer. - */ - public static class CambriaConsumerWrapper implements FilterableBusConsumer { - - /** - * logger. - */ - private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); - - /** - * Used to build the consumer. + * Fetch timeout. */ - private final ConsumerBuilder builder; - - /** - * Locked while updating {@link #consumer} and {@link #newConsumer}. - */ - private final Object consLocker = new Object(); - - /** - * Cambria client. - */ - private CambriaConsumer consumer; + protected int fetchTimeout; /** - * Cambria client to use for next fetch. + * Time to sleep on a fetch failure. */ - private CambriaConsumer newConsumer = null; + @Getter + private final int sleepTime; /** - * fetch timeout. + * Counted down when {@link #close()} is invoked. */ - protected int fetchTimeout; + private final CountDownLatch closeCondition = new CountDownLatch(1); - /** - * close condition. - */ - protected CountDownLatch closeCondition = new CountDownLatch(1); /** - * Cambria Consumer Wrapper. - * BusTopicParam object contains the following parameters - * servers messaging bus hosts. - * topic topic - * apiKey API Key - * apiSecret API Secret - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit + * Constructs the object. * - * @param busTopicParams - The parameters for the bus topic - * @throws GeneralSecurityException - Security exception - * @throws MalformedURLException - Malformed URL exception + * @param busTopicParams parameters for the bus topic */ - public CambriaConsumerWrapper(BusTopicParams busTopicParams) { - + protected FetchingBusConsumer(BusTopicParams busTopicParams) { this.fetchTimeout = busTopicParams.getFetchTimeout(); - this.builder = new CambriaClientBuilders.ConsumerBuilder(); - - builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance()) - .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) - .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); - - // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(fetchTimeout + 30000); - - if (busTopicParams.isUseHttps()) { - builder.usingHttps(); - - if (busTopicParams.isAllowSelfSignedCerts()) { - builder.allowSelfSignedCertificates(); - } - } - - if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { - builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - } - - if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { - builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); - } - - try { - this.consumer = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Iterable<String> fetch() throws IOException { - try { - return getCurrentConsumer().fetch(); - } catch (final IOException e) { //NOSONAR - logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), - this.fetchTimeout); - sleepAfterFetchFailure(); - throw e; + if (this.fetchTimeout <= 0) { + this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH; + } else { + // don't sleep too long, even if fetch timeout is large + this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); } } - private void sleepAfterFetchFailure() { + /** + * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed, + * or the thread is interrupted, then this will return immediately. + */ + protected void sleepAfterFetchFailure() { try { - this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR + logger.info("{}: backoff for {}ms", this, sleepTime); + if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) { + logger.info("{}: closed while handling fetch error", this); + } } catch (InterruptedException e) { logger.warn("{}: interrupted while handling fetch error", this, e); @@ -190,365 +129,148 @@ public interface BusConsumer { @Override public void close() { this.closeCondition.countDown(); - getCurrentConsumer().close(); - } - - private CambriaConsumer getCurrentConsumer() { - CambriaConsumer old = null; - CambriaConsumer ret; - - synchronized (consLocker) { - if (this.newConsumer != null) { - // replace old consumer with new consumer - old = this.consumer; - this.consumer = this.newConsumer; - this.newConsumer = null; - } - - ret = this.consumer; - } - - if (old != null) { - old.close(); - } - - return ret; - } - - @Override - public void setFilter(String filter) { - logger.info("{}: setting DMAAP server-side filter: {}", this, filter); - builder.withServerSideFilter(filter); - - try { - CambriaConsumer previous; - synchronized (consLocker) { - previous = this.newConsumer; - this.newConsumer = builder.build(); - } - - if (previous != null) { - // there was already a new consumer - close it - previous.close(); - } - - } catch (MalformedURLException | GeneralSecurityException e) { - /* - * Since an exception occurred, "consumer" still has its old value, thus it should - * not be closed at this point. - */ - throw new IllegalArgumentException(e); - } - } - - @Override - public String toString() { - return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; } } /** - * MR based consumer. + * Kafka based consumer. */ - public abstract class DmaapConsumerWrapper implements BusConsumer { + class KafkaConsumerWrapper extends FetchingBusConsumer { /** * logger. */ - private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); - - /** - * Name of the "protocol" property. - */ - protected static final String PROTOCOL_PROP = "Protocol"; + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); - /** - * fetch timeout. - */ - protected int fetchTimeout; + private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; /** - * close condition. + * Kafka consumer. */ - protected CountDownLatch closeCondition = new CountDownLatch(1); + protected KafkaConsumer<String, String> consumer; + protected Properties kafkaProps; - /** - * MR Consumer. - */ - protected MRConsumerImpl consumer; + protected boolean allowTracing; /** - * MR Consumer Wrapper. + * Kafka Consumer Wrapper. + * BusTopicParam - object contains the following parameters + * servers - messaging bus hosts. + * topic - topic * - * <p>servers messaging bus hosts - * topic topic - * apiKey API Key - * apiSecret API Secret - * username AAF Login - * password AAF Password - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit - * - * @param busTopicParams contains above listed attributes - * @throws MalformedURLException URL should be valid + * @param busTopicParams - The parameters for the bus topic */ - public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - this.fetchTimeout = busTopicParams.getFetchTimeout(); + public KafkaConsumerWrapper(BusTopicParams busTopicParams) { + super(busTopicParams); if (busTopicParams.isTopicInvalid()) { - throw new IllegalArgumentException("No topic for DMaaP"); + throw new IllegalArgumentException("No topic for Kafka"); } - this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(), - busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(), - busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null, - busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - - this.consumer.setUsername(busTopicParams.getUserName()); - this.consumer.setPassword(busTopicParams.getPassword()); - } - - @Override - public Iterable<String> fetch() throws IOException { - final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); - if (response == null) { - logger.warn("{}: DMaaP NULL response received", this); - - sleepAfterFetchFailure(); - return new ArrayList<>(); - } else { - logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), - response.getResponseMessage()); - - if (!"200".equals(response.getResponseCode())) { - - logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); + //Setup Properties for consumer + kafkaProps = new Properties(); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + busTopicParams.getServers().get(0)); - sleepAfterFetchFailure(); - - /* fall through */ - } + if (busTopicParams.isAdditionalPropsValid()) { + kafkaProps.putAll(busTopicParams.getAdditionalProps()); } - if (response.getActualMessages() == null) { - return new ArrayList<>(); - } else { - return response.getActualMessages(); + if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); } - } - - private void sleepAfterFetchFailure() { - try { - this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR - - } catch (InterruptedException e) { - logger.warn("{}: interrupted while handling fetch error", this, e); - Thread.currentThread().interrupt(); + if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); } - } - - @Override - public void close() { - this.closeCondition.countDown(); - this.consumer.close(); - } - - @Override - public String toString() { - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; - } - } - - /** - * MR based consumer. - */ - public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); - - private final Properties props; - - /** - * BusTopicParams contain the following parameters. - * MR Consumer Wrapper. - * - * <p>servers messaging bus hosts - * topic topic - * apiKey API Key - * apiSecret API Secret - * aafLogin AAF Login - * aafPassword AAF Password - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit - * - * @param busTopicParams contains above listed params - * @throws MalformedURLException URL should be valid - */ - public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - super(busTopicParams); - - // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if (busTopicParams.isServersInvalid()) { - throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup()); } - - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - props = new Properties(); - - if (busTopicParams.isUseHttps()) { - props.setProperty(PROTOCOL_PROP, "https"); - this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904"); + if (busTopicParams.isAllowTracing()) { + this.allowTracing = true; + kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingConsumerInterceptor.class.getName()); } - this.consumer.setProps(props); - logger.info("{}: CREATION", this); + consumer = new KafkaConsumer<>(kafkaProps); + //Subscribe to the topic + consumer.subscribe(List.of(busTopicParams.getTopic())); } @Override - public String toString() { - final MRConsumerImpl consumer = this.consumer; - - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; - } - } - - public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); - - private final Properties props; - - /** - * Constructor. - * - * @param busTopicParams topic paramters - * - * @throws MalformedURLException must provide a valid URL - */ - public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - - super(busTopicParams); - - - final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get( - PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) - : null); - - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + public Iterable<String> fetch() { + ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout)); + if (records == null || records.count() <= 0) { + return Collections.emptyList(); } + List<String> messages = new ArrayList<>(records.count()); + try { + if (allowTracing) { + createParentTraceContext(records); + } - if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + for (TopicPartition partition : records.partitions()) { + List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); + for (ConsumerRecord<String, String> partitionRecord : partitionRecords) { + messages.add(partitionRecord.value()); + } + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } catch (Exception e) { + logger.error("{}: cannot fetch, throwing exception after sleep...", this); + sleepAfterFetchFailure(); + throw e; } + return messages; + } - final String serviceName = busTopicParams.getServers().get(0); - - this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - this.consumer.setUsername(busTopicParams.getUserName()); - this.consumer.setPassword(busTopicParams.getPassword()); - - props = new Properties(); - - props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); - - props.setProperty("username", busTopicParams.getUserName()); - props.setProperty("password", busTopicParams.getPassword()); - - /* These are required, no defaults */ - props.setProperty("topic", busTopicParams.getTopic()); - - props.setProperty("Environment", busTopicParams.getEnvironment()); - props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); + private void createParentTraceContext(ConsumerRecords<String, String> records) { + TraceParentInfo traceParentInfo = new TraceParentInfo(); + for (ConsumerRecord<String, String> consumerRecord : records) { - if (busTopicParams.getPartner() != null) { - props.setProperty("Partner", busTopicParams.getPartner()); + Headers consumerRecordHeaders = consumerRecord.headers(); + traceParentInfo = processTraceParentHeader(consumerRecordHeaders); } - if (dme2RouteOffer != null) { - props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", busTopicParams.getLatitude()); - props.setProperty("Longitude", busTopicParams.getLongitude()); - - /* These are optional, will default to these values if not set in additionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "GET"); + SpanContext spanContext = SpanContext.createFromRemoteParent( + traceParentInfo.getTraceId(), traceParentInfo.getSpanId(), + TraceFlags.getSampled(), TraceState.builder().build()); - if (busTopicParams.isUseHttps()) { - props.setProperty(PROTOCOL_PROP, "https"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - } + Context.current().with(Span.wrap(spanContext)).makeCurrent(); + } - props.setProperty("contenttype", "application/json"); + private TraceParentInfo processTraceParentHeader(Headers headers) { + TraceParentInfo traceParentInfo = new TraceParentInfo(); + if (headers.lastHeader("traceparent") != null) { + traceParentInfo.setParentTraceId(new String(headers.lastHeader( + "traceparent").value(), StandardCharsets.UTF_8)); - if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } + String[] parts = traceParentInfo.getParentTraceId().split("-"); + traceParentInfo.setTraceId(parts[1]); + traceParentInfo.setSpanId(parts[2]); } - MRClientFactory.prop = props; - this.consumer.setProps(props); + return traceParentInfo; + } - logger.info("{}: CREATION", this); + @Data + @NoArgsConstructor + private static class TraceParentInfo { + private String parentTraceId; + private String traceId; + private String spanId; } - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + propnm + " property for DME2 in DMaaP"); + @Override + public void close() { + super.close(); + this.consumer.close(); + logger.info("Kafka Consumer exited {}", this); + } + @Override + public String toString() { + return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 469794c7..1b57e48e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -2,8 +2,10 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,29 +23,22 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import com.att.nsa.apiClient.http.HttpClient.ConnectionType; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import com.fasterxml.jackson.annotation.JsonIgnore; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor; import java.util.Properties; -import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; -import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; -import org.onap.dmaap.mr.client.response.MRPublisherResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.onap.policy.common.gson.annotation.GsonJsonIgnore; +import java.util.UUID; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public interface BusPublisher { + String NO_MESSAGE_PROVIDED = "No message provided"; + String LOG_CLOSE = "{}: CLOSE"; + String LOG_CLOSE_FAILED = "{}: CLOSE FAILED"; + /** * sends a message. * @@ -52,73 +47,76 @@ public interface BusPublisher { * @return true if success, false otherwise * @throws IllegalArgumentException if no message provided */ - public boolean send(String partitionId, String message); + boolean send(String partitionId, String message); /** * closes the publisher. */ - public void close(); + void close(); /** - * Cambria based library publisher. + * Kafka based library publisher. */ - public static class CambriaPublisherWrapper implements BusPublisher { + class KafkaPublisherWrapper implements BusPublisher { + + private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); + private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); + private final String topic; /** - * The actual Cambria publisher. + * Kafka publisher. */ - @JsonIgnore - @GsonJsonIgnore - protected CambriaBatchingPublisher publisher; + private final Producer<String, String> producer; + protected Properties kafkaProps; /** - * Constructor. + * Kafka Publisher Wrapper. * * @param busTopicParams topic parameters */ - public CambriaPublisherWrapper(BusTopicParams busTopicParams) { - - PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); - - builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()); + protected KafkaPublisherWrapper(BusTopicParams busTopicParams) { - // Set read timeout to 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(30000); - - if (busTopicParams.isUseHttps()) { - if (busTopicParams.isAllowSelfSignedCerts()) { - builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION); - } else { - builder.withConnectionType(ConnectionType.HTTPS); - } + if (busTopicParams.isTopicInvalid()) { + throw new IllegalArgumentException("No topic for Kafka"); } + this.topic = busTopicParams.getTopic(); - if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { - builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); + // Setup Properties for consumer + kafkaProps = new Properties(); + kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0)); + if (busTopicParams.isAdditionalPropsValid()) { + kafkaProps.putAll(busTopicParams.getAdditionalProps()); } - - if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { - builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); + if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + } + if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); } - try { - this.publisher = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); + if (busTopicParams.isAllowTracing()) { + kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingProducerInterceptor.class.getName()); } + + producer = new KafkaProducer<>(kafkaProps); } @Override public boolean send(String partitionId, String message) { if (message == null) { - throw new IllegalArgumentException("No message provided"); + throw new IllegalArgumentException(NO_MESSAGE_PROVIDED); } try { - this.publisher.send(partitionId, message); + // Create the record + ProducerRecord<String, String> producerRecord = + new ProducerRecord<>(topic, UUID.randomUUID().toString(), message); + + this.producer.send(producerRecord); + producer.flush(); } catch (Exception e) { logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); return false; @@ -128,263 +126,19 @@ public interface BusPublisher { @Override public void close() { - logger.info("{}: CLOSE", this); - - try { - this.publisher.close(); - } catch (Exception e) { - logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); - } - } - - - @Override - public String toString() { - return "CambriaPublisherWrapper []"; - } - - } - - /** - * DmaapClient library wrapper. - */ - public abstract class DmaapPublisherWrapper implements BusPublisher { - - private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); - - /** - * MR based Publisher. - */ - protected MRSimplerBatchPublisher publisher; - protected Properties props; - - /** - * MR Publisher Wrapper. - * - * @param servers messaging bus hosts - * @param topic topic - * @param username AAF or DME2 Login - * @param password AAF or DME2 Password - */ - public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic, - String username, String password, boolean useHttps) { - - - if (StringUtils.isBlank(topic)) { - throw new IllegalArgumentException("No topic for DMaaP"); - } - - - configureProtocol(topic, protocol, servers, useHttps); - - this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); - - this.publisher.setUsername(username); - this.publisher.setPassword(password); - - props = new Properties(); - - props.setProperty("Protocol", (useHttps ? "https" : "http")); - props.setProperty("contenttype", "application/json"); - props.setProperty("username", username); - props.setProperty("password", password); - - props.setProperty("topic", topic); - - this.publisher.setProps(props); - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - this.publisher.setHost(servers.get(0)); - } - - logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); - } - - private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers, - boolean useHttps) { - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); - } - - ArrayList<String> dmaapServers = new ArrayList<>(); - String port = useHttps ? ":3905" : ":3904"; - for (String server : servers) { - dmaapServers.add(server + port); - } - - - this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); - - this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - } else if (protocol == ProtocolTypeConstants.DME2) { - ArrayList<String> dmaapServers = new ArrayList<>(); - dmaapServers.add("0.0.0.0:3904"); - - this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); - - this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - } else { - throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol); - } - } - - @Override - public void close() { - logger.info("{}: CLOSE", this); + logger.info(LOG_CLOSE, this); try { - this.publisher.close(1, TimeUnit.SECONDS); + this.producer.close(); } catch (Exception e) { logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); } } @Override - public boolean send(String partitionId, String message) { - if (message == null) { - throw new IllegalArgumentException("No message provided"); - } - - this.publisher.setPubResponse(new MRPublisherResponse()); - this.publisher.send(partitionId, message); - MRPublisherResponse response = this.publisher.sendBatchWithResponse(); - if (response != null) { - logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(), - response.getResponseMessage()); - } - - return true; - } - - @Override public String toString() { - return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate() - + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()=" - + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag() - + ", publisher.getUsername()=" + publisher.getUsername() + "]"; + return "KafkaPublisherWrapper []"; } - } - - /** - * DmaapClient library wrapper. - */ - public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { - /** - * MR based Publisher. - */ - public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword, - boolean useHttps) { - - super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps); - } - } - - public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { - - /** - * Constructor. - * - * @param busTopicParams topic parameters - */ - public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) { - - super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(), - busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps()); - - String dme2RouteOffer = busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get( - PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) - : null; - validateParams(busTopicParams, dme2RouteOffer); - - String serviceName = busTopicParams.getServers().get(0); - - /* These are required, no defaults */ - props.setProperty("Environment", busTopicParams.getEnvironment()); - props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); - - props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); - - if (busTopicParams.getPartner() != null) { - props.setProperty("Partner", busTopicParams.getPartner()); - } - if (dme2RouteOffer != null) { - props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", busTopicParams.getLatitude()); - props.setProperty("Longitude", busTopicParams.getLongitude()); - - // ServiceName also a default, found in additionalProps - - /* These are optional, will default to these values if not set in optionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "POST"); - - if (busTopicParams.isAdditionalPropsValid()) { - addAdditionalProps(busTopicParams); - } - - this.publisher.setProps(props); - } - - private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) { - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } - - if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } - } - - private void addAdditionalProps(BusTopicParams busTopicParams) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - - if (value != null) { - props.setProperty(key, value); - } - } - } - - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + propnm + " property for DME2 in DMaaP"); - - } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java index ccf25753..f8236d3d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java @@ -2,14 +2,15 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,11 +21,13 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.bus.ApiKeyEnabled; /** * Bus Topic Base. */ +@Getter public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { /** @@ -43,58 +46,37 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { protected boolean useHttps; /** + * Allow tracing. + */ + protected boolean allowTracing; + + /** * allow self signed certificates. */ protected boolean allowSelfSignedCerts; /** * Instantiates a new Bus Topic Base. - * + * * <p>servers list of servers * topic topic name * apiKey API Key * apiSecret API Secret * useHttps does connection use HTTPS? + * allowTracing Is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * @param busTopicParams holds all our parameters * @throws IllegalArgumentException if invalid parameters are present */ - public BusTopicBase(BusTopicParams busTopicParams) { + protected BusTopicBase(BusTopicParams busTopicParams) { super(busTopicParams.getServers(), busTopicParams.getTopic(), busTopicParams.getEffectiveTopic()); this.apiKey = busTopicParams.getApiKey(); this.apiSecret = busTopicParams.getApiSecret(); this.useHttps = busTopicParams.isUseHttps(); + this.allowTracing = busTopicParams.isAllowTracing(); this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts(); } - @Override - public String getApiKey() { - return apiKey; - } - - @Override - public String getApiSecret() { - return apiSecret; - } - - /** - * Is using HTTPS. - * - * @return if using https - */ - public boolean isUseHttps() { - return useHttps; - } - - /** - * Is self signed certificates allowed. - * - * @return if self signed certificates are allowed - */ - public boolean isAllowSelfSignedCerts() { - return allowSelfSignedCerts; - } - protected boolean anyNullOrEmpty(String... args) { for (String arg : args) { if (arg == null || arg.isEmpty()) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java index 9df7221f..53a6ab66 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java @@ -3,8 +3,8 @@ * ONAP * ================================================================================ * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved. - * Modifications Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019, 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,21 +24,23 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.util.List; import java.util.Map; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import org.apache.commons.lang3.StringUtils; /** * Member variables of this Params class are as follows. * - * <p>servers DMaaP servers - * topic DMaaP Topic to be monitored - * apiKey DMaaP API Key (optional) - * apiSecret DMaaP API Secret (optional) - * consumerGroup DMaaP Reader Consumer Group - * consumerInstance DMaaP Reader Instance - * fetchTimeout DMaaP fetch timeout - * fetchLimit DMaaP fetch limit + * <p>servers Kafka servers + * topic Kafka Topic to be monitored + * apiKey Kafka API Key (optional) + * apiSecret Kafka API Secret (optional) + * consumerGroup kafka Reader Consumer Group + * consumerInstance Kafka Reader Instance + * fetchTimeout kafka fetch timeout + * fetchLimit Kafka fetch limit * environment DME2 Environment * aftEnvironment DME2 AFT Environment * partner DME2 Partner @@ -46,6 +48,7 @@ import org.apache.commons.lang3.StringUtils; * longitude DME2 Longitude * additionalProps Additional properties to pass to DME2 * useHttps does connection use HTTPS? + * allowTracing is message tracing allowed? * allowSelfSignedCerts are self-signed certificates allow */ @Getter @@ -64,6 +67,7 @@ public class BusTopicParams { private int fetchTimeout; private int fetchLimit; private boolean useHttps; + private boolean allowTracing; private boolean allowSelfSignedCerts; private boolean managed; @@ -78,6 +82,7 @@ public class BusTopicParams { private String clientName; private String hostname; private String basePath; + @Getter private String serializationProvider; public static TopicParamsBuilder builder() { @@ -165,29 +170,43 @@ public class BusTopicParams { return additionalProps != null; } - public String getSerializationProvider() { - return serializationProvider; + public void setEffectiveTopic(String effectiveTopic) { + this.effectiveTopic = topicToLowerCase(effectiveTopic); } + public void setTopic(String topic) { + this.topic = topicToLowerCase(topic); + } + + public String getEffectiveTopic() { + return topicToLowerCase(effectiveTopic); + } + + public String getTopic() { + return topicToLowerCase(topic); + } + + private String topicToLowerCase(String topic) { + return (topic == null || topic.isEmpty()) ? topic : topic.toLowerCase(); + } + + @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class TopicParamsBuilder { final BusTopicParams params = new BusTopicParams(); - private TopicParamsBuilder() { - } - public TopicParamsBuilder servers(List<String> servers) { this.params.servers = servers; return this; } public TopicParamsBuilder topic(String topic) { - this.params.topic = topic; + this.params.setTopic(topic); return this; } public TopicParamsBuilder effectiveTopic(String effectiveTopic) { - this.params.effectiveTopic = effectiveTopic; + this.params.setEffectiveTopic(effectiveTopic); return this; } @@ -226,6 +245,11 @@ public class BusTopicParams { return this; } + public TopicParamsBuilder allowTracing(boolean allowTracing) { + this.params.allowTracing = allowTracing; + return this; + } + public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) { this.params.allowSelfSignedCerts = allowSelfSignedCerts; return this; @@ -309,7 +333,6 @@ public class BusTopicParams { this.params.serializationProvider = serializationProvider; return this; } - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java index e94bdffa..9b724072 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -2,8 +2,10 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +24,8 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.util.UUID; - +import lombok.Getter; +import lombok.Setter; import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; @@ -30,8 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink - * regardless if it is UEB or DMaaP. + * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink. * */ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink { @@ -44,7 +46,9 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi /** * The partition key to publish to. */ - protected String partitionId; + @Getter + @Setter + protected String partitionKey; /** * Message bus publisher. @@ -60,17 +64,18 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi * apiSecret api secret * partitionId partition id * useHttps does connection use HTTPS? + * allowTracing is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * - * @throws IllegalArgumentException in invalid parameters are passed in + * @throws IllegalArgumentException if invalid parameters are passed in */ - public InlineBusTopicSink(BusTopicParams busTopicParams) { + protected InlineBusTopicSink(BusTopicParams busTopicParams) { super(busTopicParams); if (busTopicParams.isPartitionIdInvalid()) { - this.partitionId = UUID.randomUUID().toString(); + this.partitionKey = UUID.randomUUID().toString(); } else { - this.partitionId = busTopicParams.getPartitionId(); + this.partitionKey = busTopicParams.getPartitionId(); } } @@ -84,17 +89,14 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi logger.info("{}: starting", this); synchronized (this) { + if (!this.alive) { + if (locked) { + throw new IllegalStateException(this + " is locked."); + } - if (this.alive) { - return true; - } - - if (locked) { - throw new IllegalStateException(this + " is locked."); + this.init(); + this.alive = true; } - - this.init(); - this.alive = true; } return true; @@ -142,7 +144,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi NetLoggerUtil.log(EventType.OUT, this.getTopicCommInfrastructure(), this.topic, message); - publisher.send(this.partitionId, message); + publisher.send(this.partitionKey, message); broadcast(message); } catch (Exception e) { logger.warn("{}: cannot send because of {}", this, e.getMessage(), e); @@ -153,44 +155,13 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi } @Override - public void setPartitionKey(String partitionKey) { - this.partitionId = partitionKey; - } - - @Override - public String getPartitionKey() { - return this.partitionId; - } - - @Override public void shutdown() { this.stop(); } @Override - protected boolean anyNullOrEmpty(String... args) { - for (String arg : args) { - if (arg == null || arg.isEmpty()) { - return true; - } - } - - return false; - } - - @Override - protected boolean allNullOrEmpty(String... args) { - for (String arg : args) { - if (!(arg == null || arg.isEmpty())) { - return false; - } - } - - return true; - } - - @Override public String toString() { - return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]"; + return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher + + "]"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java deleted file mode 100644 index ba556bb8..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java +++ /dev/null @@ -1,130 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import java.util.Map; - -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This implementation publishes events for the associated DMAAP topic, inline with the calling - * thread. - */ -public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink { - - protected static Logger logger = LoggerFactory.getLogger(InlineDmaapTopicSink.class); - - protected final String userName; - protected final String password; - - protected String environment = null; - protected String aftEnvironment = null; - protected String partner = null; - protected String latitude = null; - protected String longitude = null; - - protected Map<String, String> additionalProps = null; - - /** - * BusTopicParams contains the below mentioned attributes. - * servers DMaaP servers - * topic DMaaP Topic to be monitored - * apiKey DMaaP API Key (optional) - * apiSecret DMaaP API Secret (optional) - * environment DME2 Environment - * aftEnvironment DME2 AFT Environment - * partner DME2 Partner - * latitude DME2 Latitude - * longitude DME2 Longitude - * additionalProps Additional properties to pass to DME2 - * useHttps does connection use HTTPS? - * allowSelfSignedCerts are self-signed certificates allow - * @param busTopicParams Contains the above mentioned parameters - * @throws IllegalArgumentException An invalid parameter passed in - */ - public InlineDmaapTopicSink(BusTopicParams busTopicParams) { - - super(busTopicParams); - - this.userName = busTopicParams.getUserName(); - this.password = busTopicParams.getPassword(); - - this.environment = busTopicParams.getEnvironment(); - this.aftEnvironment = busTopicParams.getAftEnvironment(); - this.partner = busTopicParams.getPartner(); - - this.latitude = busTopicParams.getLatitude(); - this.longitude = busTopicParams.getLongitude(); - - this.additionalProps = busTopicParams.getAdditionalProps(); - } - - - @Override - public void init() { - if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .userName(this.userName) - .password(this.password) - .useHttps(this.useHttps) - .allowSelfSignedCerts(this.allowSelfSignedCerts) - .build()); - } else { - this.publisher = new BusPublisher.DmaapDmePublisherWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .userName(this.userName) - .password(this.password) - .environment(this.environment) - .aftEnvironment(this.aftEnvironment) - .partner(this.partner) - .latitude(this.latitude) - .longitude(this.longitude) - .additionalProps(this.additionalProps) - .useHttps(this.useHttps) - .build()); - } - - logger.info("{}: DMAAP SINK created", this); - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.DMAAP; - } - - - @Override - public String toString() { - return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password - + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + super.toString() - + "]"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java index f258d5d9..6354f762 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java @@ -1,9 +1,6 @@ /* * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,38 +18,39 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This implementation publishes events for the associated UEB topic, inline with the calling + * This implementation publishes events for the associated KAFKA topic, inline with the calling * thread. */ -public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink { +public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTopicSink { /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class); + private static final Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class); + + protected Map<String, String> additionalProps; /** - * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned + * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains the below * attributes. * - * <p>servers list of UEB servers available for publishing + * <p>servers list of KAFKA servers available for publishing * topic the topic to publish to - * apiKey the api key (optional) - * apiSecret the api secret (optional) * partitionId the partition key (optional, autogenerated if not provided) * useHttps does connection use HTTPS? - * allowSelfSignedCerts are self-signed certificates allow * @param busTopicParams contains attributes needed * @throws IllegalArgumentException if invalid arguments are detected */ - public InlineUebTopicSink(BusTopicParams busTopicParams) { + public InlineKafkaTopicSink(BusTopicParams busTopicParams) { super(busTopicParams); + this.additionalProps = busTopicParams.getAdditionalProps(); } /** @@ -61,27 +59,24 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi @Override public void init() { - this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder() + this.publisher = new BusPublisher.KafkaPublisherWrapper(BusTopicParams.builder() .servers(this.servers) .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) .useHttps(this.useHttps) - .allowSelfSignedCerts(this.allowSelfSignedCerts) + .allowTracing(this.allowTracing) + .additionalProps(this.additionalProps) .build()); - logger.info("{}: UEB SINK created", this); + logger.info("{}: KAFKA SINK created", this); } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("InlineUebTopicSink [getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()) - .append(", toString()=").append(super.toString()).append("]"); - return builder.toString(); + return "InlineKafkaTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + + super.toString() + "]"; } @Override public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.UEB; + return Topic.CommInfrastructure.KAFKA; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 164f2b16..f98b481f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -2,8 +2,9 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,11 +25,9 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.io.IOException; import java.net.MalformedURLException; import java.util.UUID; - -import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource; +import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; @@ -41,32 +40,36 @@ import org.slf4j.LoggerFactory; * notifying its listeners. */ public abstract class SingleThreadedBusTopicSource extends BusTopicBase - implements Runnable, BusTopicSource, FilterableTopicSource { + implements Runnable, BusTopicSource { /** * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only * that in a single file in a concise format. */ - private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class); + private static Logger logger = LoggerFactory.getLogger(SingleThreadedBusTopicSource.class); /** * Bus consumer group. */ + @Getter protected final String consumerGroup; /** * Bus consumer instance. */ + @Getter protected final String consumerInstance; /** * Bus fetch timeout. */ + @Getter protected final int fetchTimeout; /** * Bus fetch limit. */ + @Getter protected final int fetchLimit; /** @@ -87,19 +90,24 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase * * @throws IllegalArgumentException An invalid parameter passed in */ - public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) { + protected SingleThreadedBusTopicSource(BusTopicParams busTopicParams) { super(busTopicParams); - if (busTopicParams.isConsumerGroupInvalid()) { + if (busTopicParams.isConsumerGroupInvalid() && busTopicParams.isConsumerInstanceInvalid()) { this.consumerGroup = UUID.randomUUID().toString(); - } else { + this.consumerInstance = NetworkUtil.getHostname(); + + } else if (busTopicParams.isConsumerGroupInvalid()) { + this.consumerGroup = UUID.randomUUID().toString(); + this.consumerInstance = busTopicParams.getConsumerInstance(); + + } else if (busTopicParams.isConsumerInstanceInvalid()) { this.consumerGroup = busTopicParams.getConsumerGroup(); - } + this.consumerInstance = UUID.randomUUID().toString(); - if (busTopicParams.isConsumerInstanceInvalid()) { - this.consumerInstance = NetworkUtil.getHostname(); } else { + this.consumerGroup = busTopicParams.getConsumerGroup(); this.consumerInstance = busTopicParams.getConsumerInstance(); } @@ -134,8 +142,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase logger.info("{}: register: start not attempted", this); } } catch (Exception e) { - logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(), - e); + logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e); } } @@ -176,8 +183,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase busPollerThread.start(); return true; } catch (Exception e) { - logger.warn("{}: cannot start because of {}", this, e.getMessage(), e); - throw new IllegalStateException(e); + throw new IllegalStateException(this + ": cannot start", e); } } } @@ -227,7 +233,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase try { fetchAllMessages(); } catch (IOException | RuntimeException e) { - logger.error("{}: cannot fetch because of ", this, e.getMessage(), e); + logger.error("{}: cannot fetch", this, e); } } @@ -265,17 +271,6 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase return broadcast(event); } - - @Override - public void setFilter(String filter) { - if (consumer instanceof FilterableBusConsumer) { - ((FilterableBusConsumer) consumer).setFilter(filter); - - } else { - throw new UnsupportedOperationException("no server-side filtering for topic " + topic); - } - } - @Override public String toString() { return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance @@ -285,29 +280,8 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase } @Override - public String getConsumerGroup() { - return consumerGroup; - } - - @Override - public String getConsumerInstance() { - return consumerInstance; - } - - @Override public void shutdown() { this.stop(); this.topicListeners.clear(); } - - @Override - public int getFetchTimeout() { - return fetchTimeout; - } - - @Override - public int getFetchLimit() { - return fetchLimit; - } - } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java deleted file mode 100644 index e5d08a2a..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ /dev/null @@ -1,143 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import java.net.MalformedURLException; -import java.util.Map; - -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This topic reader implementation specializes in reading messages over DMAAP topic and notifying - * its listeners. - */ -public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable { - - private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class); - - - protected final String userName; - protected final String password; - - protected String environment = null; - protected String aftEnvironment = null; - protected String partner = null; - protected String latitude = null; - protected String longitude = null; - - protected Map<String, String> additionalProps = null; - - - /** - * Constructor. - * - * @param busTopicParams Parameters object containing all the required inputs - * - * @throws IllegalArgumentException An invalid parameter passed in - */ - public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) { - - super(busTopicParams); - - this.userName = busTopicParams.getUserName(); - this.password = busTopicParams.getPassword(); - - this.environment = busTopicParams.getEnvironment(); - this.aftEnvironment = busTopicParams.getAftEnvironment(); - this.partner = busTopicParams.getPartner(); - - this.latitude = busTopicParams.getLatitude(); - this.longitude = busTopicParams.getLongitude(); - - this.additionalProps = busTopicParams.getAdditionalProps(); - try { - this.init(); - } catch (Exception e) { - logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", - topic, e.getMessage(), e); - throw new IllegalArgumentException(e); - } - } - - - /** - * Initialize the Cambria or MR Client. - */ - @Override - public void init() throws MalformedURLException { - BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .consumerGroup(this.consumerGroup) - .consumerInstance(this.consumerInstance) - .fetchTimeout(this.fetchTimeout) - .fetchLimit(this.fetchLimit) - .useHttps(this.useHttps); - - if (anyNullOrEmpty(this.userName, this.password)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(builder - .allowSelfSignedCerts(this.allowSelfSignedCerts) - .build()); - } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(builder - .userName(this.userName) - .password(this.password) - .allowSelfSignedCerts(this.allowSelfSignedCerts) - .build()); - } else { - this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(builder - .userName(this.userName) - .password(this.password) - .environment(this.environment) - .aftEnvironment(this.aftEnvironment) - .partner(this.partner) - .latitude(this.latitude) - .longitude(this.longitude) - .additionalProps(this.additionalProps) - .build()); - } - - logger.info("{}: INITTED", this); - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.DMAAP; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=") - .append((password == null || password.isEmpty()) ? "-" : password.length()) - .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=") - .append(super.toString()).append("]"); - return builder.toString(); - } - - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java index e210762d..869273f0 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java @@ -1,9 +1,6 @@ /* * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,14 +18,18 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.net.MalformedURLException; +import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; /** - * This topic source implementation specializes in reading messages over an UEB Bus topic source and + * This topic source implementation specializes in reading messages over a Kafka Bus topic source and * notifying its listeners. */ -public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource { +public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource { + + protected Map<String, String> additionalProps = null; /** * Constructor. @@ -36,40 +37,43 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i * @param busTopicParams Parameters object containing all the required inputs * @throws IllegalArgumentException An invalid parameter passed in */ - public SingleThreadedUebTopicSource(BusTopicParams busTopicParams) { + public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) { super(busTopicParams); - this.init(); + this.additionalProps = busTopicParams.getAdditionalProps(); + try { + this.init(); + } catch (Exception e) { + throw new IllegalArgumentException("ERROR during init in kafka-source: cannot create topic " + topic, e); + } } /** * Initialize the Cambria client. */ @Override - public void init() { - this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() + public void init() throws MalformedURLException { + BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder() .servers(this.servers) .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .consumerGroup(this.consumerGroup) - .consumerInstance(this.consumerInstance) .fetchTimeout(this.fetchTimeout) - .fetchLimit(this.fetchLimit) + .consumerGroup(this.consumerGroup) .useHttps(this.useHttps) - .allowSelfSignedCerts(this.allowSelfSignedCerts).build()); + .allowTracing(this.allowTracing); + + this.consumer = new BusConsumer.KafkaConsumerWrapper(builder + .additionalProps(this.additionalProps) + .build()); } @Override public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.UEB; + return Topic.CommInfrastructure.KAFKA; } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("SingleThreadedUebTopicSource [getTopicCommInfrastructure()=") - .append(getTopicCommInfrastructure()).append(", toString()=").append(super.toString()).append("]"); - return builder.toString(); + return "SingleThreadedKafkaTopicSource [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + + ", toString()=" + super.toString() + "]"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java index 6f07df1b..c63fbcc2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java @@ -2,14 +2,16 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,19 +24,21 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.util.ArrayList; import java.util.List; - +import lombok.AccessLevel; +import lombok.Getter; import org.apache.commons.collections4.queue.CircularFifoQueue; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Getter public abstract class TopicBase implements Topic { /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(TopicBase.class); + private static final Logger logger = LoggerFactory.getLogger(TopicBase.class); /** * List of servers. @@ -72,17 +76,18 @@ public abstract class TopicBase implements Topic { /** * All my subscribers for new message notifications. */ + @Getter(AccessLevel.NONE) protected final ArrayList<TopicListener> topicListeners = new ArrayList<>(); /** * Instantiates a new Topic Base. - * + * * @param servers list of servers * @param topic topic name - * + * * @throws IllegalArgumentException if invalid parameters are present */ - public TopicBase(List<String> servers, String topic) { + protected TopicBase(List<String> servers, String topic) { this(servers, topic, topic); } @@ -94,7 +99,7 @@ public abstract class TopicBase implements Topic { * * @throws IllegalArgumentException if invalid parameters are present */ - public TopicBase(List<String> servers, String topic, String effectiveTopic) { + protected TopicBase(List<String> servers, String topic, String effectiveTopic) { if (servers == null || servers.isEmpty()) { throw new IllegalArgumentException("Server(s) must be provided"); @@ -112,8 +117,8 @@ public abstract class TopicBase implements Topic { } this.servers = servers; - this.topic = topic; - this.effectiveTopic = effectiveTopicCopy; + this.topic = topic.toLowerCase(); + this.effectiveTopic = effectiveTopicCopy.toLowerCase(); } @Override @@ -152,14 +157,14 @@ public abstract class TopicBase implements Topic { /** * Broadcast event to all listeners. - * + * * @param message the event * @return true if all notifications are performed with no error, false otherwise */ protected boolean broadcast(String message) { List<TopicListener> snapshotListeners = this.snapshotTopicListeners(); - boolean success = true; + var success = true; for (TopicListener topicListener : snapshotListeners) { try { topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message); @@ -173,7 +178,7 @@ public abstract class TopicBase implements Topic { /** * Take a snapshot of current topic listeners. - * + * * @return the topic listeners */ protected synchronized List<TopicListener> snapshotTopicListeners() { @@ -219,33 +224,8 @@ public abstract class TopicBase implements Topic { } @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public String getTopic() { - return topic; - } - - @Override - public String getEffectiveTopic() { - return effectiveTopic; - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public List<String> getServers() { - return servers; - } - - @Override public synchronized String[] getRecentEvents() { - String[] events = new String[recentEvents.size()]; + var events = new String[recentEvents.size()]; return recentEvents.toArray(events); } |