diff options
Diffstat (limited to 'components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java')
-rw-r--r-- | components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java | 103 |
1 files changed, 64 insertions, 39 deletions
diff --git a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java index 38c1ca81..2f1e19cc 100644 --- a/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java +++ b/components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2021 China Mobile. - * Copyright (c) 2021-2022 Wipro Limited. + * Copyright (C) 2022 Wipro Limited. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,21 +21,16 @@ package org.onap.dcaegen2.kpi.utils; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import com.att.nsa.cambria.client.CambriaConsumer; + +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; + import org.onap.dcaegen2.kpi.models.Configuration; -import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; /** * Utility class to perform actions related to Dmaap. @@ -45,35 +40,65 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me */ public class DmaapUtils { - public MessageRouterPublisher buildPublisher() { - final MessageRouterPublisher publisher = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - return publisher; + /** + * Build publisher. + */ + public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) { + try { + return builder(config, topic).build(); + } catch (MalformedURLException | GeneralSecurityException e) { + return null; + + } + } + + /** + * Build consumer. + */ + public CambriaConsumer buildConsumer(Configuration config, String topic) { + + try { + return builderConsumer(config, topic).build(); + } catch (MalformedURLException | GeneralSecurityException e) { + return null; + } + } - public MessageRouterPublishRequest buildPublisherRequest(Configuration config, String topicUrl) { - MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder().topicUrl(topicUrl) - .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername()) - .password(config.getAafPassword()).build()) - .build(); - MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder() - .sinkDefinition(sinkDefinition).build(); - return request; + private static PublisherBuilder builder(Configuration config, String topic) { + if (config.isSecured()) { + return authenticatedBuilder(config, topic); + } else { + return unAuthenticatedBuilder(config, topic); + } } - public MessageRouterSubscriber buildSubscriber() { - MessageRouterSubscriber subscriber = DmaapClientFactory - .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - return subscriber; + private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) { + return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(), + config.getAafPassword()); } - public MessageRouterSubscribeRequest buildSubscriberRequest(Configuration config, String topicUrl) { - MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder().topicUrl(topicUrl) - .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername()) - .password(config.getAafPassword()).build()) - .build(); - MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder() - .consumerGroup(config.getCg()).consumerId(config.getCid()).sourceDefinition(sourceDefinition).build(); - return request; + private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) { + return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic) + .logSendFailuresAfter(5); } + + private static ConsumerBuilder builderConsumer(Configuration config, String topic) { + if (config.isSecured()) { + return authenticatedConsumerBuilder(config, topic); + } else { + return unAuthenticatedConsumerBuilder(config, topic); + } + } + + private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) { + return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic) + .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000); + } + + private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) { + return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(), + config.getAafPassword()); + } + } |