summaryrefslogtreecommitdiffstats
path: root/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java')
-rw-r--r--components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java103
1 files changed, 64 insertions, 39 deletions
diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java
index 38c1ca81..2f1e19cc 100644
--- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java
+++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java
@@ -1,7 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2021 China Mobile.
- * Copyright (c) 2021-2022 Wipro Limited.
+ * Copyright (C) 2022 Wipro Limited.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,21 +21,16 @@
package org.onap.dcaegen2.kpi.utils;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+import com.att.nsa.cambria.client.CambriaConsumer;
+
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+
import org.onap.dcaegen2.kpi.models.Configuration;
-import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
/**
* Utility class to perform actions related to Dmaap.
@@ -45,35 +40,65 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me
*/
public class DmaapUtils {
- public MessageRouterPublisher buildPublisher() {
- final MessageRouterPublisher publisher = DmaapClientFactory
- .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
- return publisher;
+ /**
+ * Build publisher.
+ */
+ public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) {
+ try {
+ return builder(config, topic).build();
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ return null;
+
+ }
+ }
+
+ /**
+ * Build consumer.
+ */
+ public CambriaConsumer buildConsumer(Configuration config, String topic) {
+
+ try {
+ return builderConsumer(config, topic).build();
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ return null;
+ }
+
}
- public MessageRouterPublishRequest buildPublisherRequest(Configuration config, String topicUrl) {
- MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder().topicUrl(topicUrl)
- .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
- .password(config.getAafPassword()).build())
- .build();
- MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
- .sinkDefinition(sinkDefinition).build();
- return request;
+ private static PublisherBuilder builder(Configuration config, String topic) {
+ if (config.isSecured()) {
+ return authenticatedBuilder(config, topic);
+ } else {
+ return unAuthenticatedBuilder(config, topic);
+ }
}
- public MessageRouterSubscriber buildSubscriber() {
- MessageRouterSubscriber subscriber = DmaapClientFactory
- .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
- return subscriber;
+ private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) {
+ return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
+ config.getAafPassword());
}
- public MessageRouterSubscribeRequest buildSubscriberRequest(Configuration config, String topicUrl) {
- MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder().topicUrl(topicUrl)
- .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
- .password(config.getAafPassword()).build())
- .build();
- MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
- .consumerGroup(config.getCg()).consumerId(config.getCid()).sourceDefinition(sourceDefinition).build();
- return request;
+ private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) {
+ return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
+ .logSendFailuresAfter(5);
}
+
+ private static ConsumerBuilder builderConsumer(Configuration config, String topic) {
+ if (config.isSecured()) {
+ return authenticatedConsumerBuilder(config, topic);
+ } else {
+ return unAuthenticatedConsumerBuilder(config, topic);
+ }
+ }
+
+ private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) {
+ return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
+ .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000);
+ }
+
+ private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) {
+ return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
+ config.getAafPassword());
+ }
+
}