diff options
author | adheli.tavares <adheli.tavares@est.tech> | 2024-04-02 15:18:20 +0100 |
---|---|---|
committer | adheli.tavares <adheli.tavares@est.tech> | 2024-04-10 13:28:45 +0100 |
commit | 7e934a6e435c62b1188b44ba5bdc3985328a85fc (patch) | |
tree | fb3379a6710e46bc14423475fc0b3c9e4a47275f /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | |
parent | 90901c96e348e8ee65f218b90e46a25d4b85f96d (diff) |
Dependency management update
- including dependencies to pom.xml files only where they are used,
avoiding extra dependencies being added in all packages.
- removal of unused UEB topic.
Issue-ID: POLICY-4945
Change-Id: Ifc0212af2bc938e357e1addebcec591f9d6cfc14
Signed-off-by: adheli.tavares <adheli.tavares@est.tech>
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | 97 |
1 files changed, 0 insertions, 97 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 3c57d1ba..b46c2715 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -23,9 +23,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; @@ -33,9 +30,7 @@ import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor; import java.io.IOException; -import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; -import java.security.GeneralSecurityException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -138,98 +133,6 @@ public interface BusConsumer { } /** - * Cambria based consumer. - */ - public static class CambriaConsumerWrapper extends FetchingBusConsumer { - - /** - * logger. - */ - private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); - - /** - * Used to build the consumer. - */ - private final ConsumerBuilder builder; - - /** - * Cambria client. - */ - private final CambriaConsumer consumer; - - /** - * Cambria Consumer Wrapper. - * BusTopicParam object contains the following parameters - * servers - messaging bus hosts. - * topic - topic for messages - * apiKey - API Key - * apiSecret - API Secret - * consumerGroup - Consumer Group - * consumerInstance - Consumer Instance - * fetchTimeout - Fetch Timeout - * fetchLimit - Fetch Limit - * - * @param busTopicParams - The parameters for the bus topic - */ - public CambriaConsumerWrapper(BusTopicParams busTopicParams) { - super(busTopicParams); - - this.builder = new CambriaClientBuilders.ConsumerBuilder(); - - builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance()) - .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) - .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); - - // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(fetchTimeout + 30000); - - if (busTopicParams.isUseHttps()) { - builder.usingHttps(); - - if (busTopicParams.isAllowSelfSignedCerts()) { - builder.allowSelfSignedCertificates(); - } - } - - if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { - builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - } - - if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { - builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); - } - - try { - this.consumer = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Iterable<String> fetch() throws IOException { - try { - return this.consumer.fetch(); - } catch (final IOException e) { //NOSONAR - logger.error("{}: cannot fetch because of {}", this, e.getMessage()); - sleepAfterFetchFailure(); - throw e; - } - } - - @Override - public void close() { - super.close(); - this.consumer.close(); - } - - @Override - public String toString() { - return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; - } - } - - /** * Kafka based consumer. */ class KafkaConsumerWrapper extends FetchingBusConsumer { |