diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java index 6738b87853..e6d15b8d5e 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java @@ -21,6 +21,7 @@ package org.openecomp.sdc.be.components.distribution.engine; import java.io.IOException; +import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Collection; @@ -31,8 +32,10 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.att.nsa.cambria.client.*; import org.apache.http.HttpStatus; import org.openecomp.sdc.be.config.BeEcompErrorManager; +import org.openecomp.sdc.be.config.ConfigurationManager; import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; import org.openecomp.sdc.common.config.EcompErrorName; import org.slf4j.Logger; @@ -40,16 +43,12 @@ import org.slf4j.LoggerFactory; import com.att.nsa.apiClient.http.HttpException; import com.att.nsa.apiClient.http.HttpObjectNotFoundException; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.att.nsa.cambria.client.CambriaClient.CambriaApiException; import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder; import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.att.nsa.cambria.client.CambriaIdentityManager; import com.att.nsa.cambria.client.CambriaPublisher.message; -import com.att.nsa.cambria.client.CambriaTopicManager; import com.google.gson.Gson; import fj.data.Either; @@ -63,6 +62,8 @@ public class CambriaHandler { private Gson gson = new Gson(); + public static boolean useHttpsWithDmaap = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().isUseHttpsWithDmaap(); + public static void main(String[] args) { // String userBodyJson ="{\"artifactName\":\"myartifact\", @@ -86,18 +87,18 @@ public class CambriaHandler { String key = "sSJc5qiBnKy2qrlc"; String secret = "4ZRPzNJfEUK0sSNBvccd2m7X"; - createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build(); + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(servers).authenticatedBy(key, secret)); String topicName = "ASDC-DISTR-NOTIF-TOPIC-PRODesofer"; String clientKey1 = "CGGoorrGPXPx2B1C"; String clientSecret1 = "OTHk2mcCSbskEtHhDw8h5oUa"; - CambriaTopicManager createStatusTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build(); + CambriaTopicManager createStatusTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(servers).authenticatedBy(key, secret)); String reportTopic = "ASDC-DISTR-STATUS-TOPIC-PRODESOFER"; createStatusTopicManager.allowProducer(reportTopic, clientKey1); - CambriaBatchingPublisher createSimplePublisher = new PublisherBuilder().onTopic(reportTopic).usingHttps().usingHosts(servers).build(); + CambriaBatchingPublisher createSimplePublisher = new PublisherBuilder().onTopic(reportTopic).usingHttps(useHttpsWithDmaap).usingHosts(servers).build(); createSimplePublisher.setApiCredentials(clientKey1, clientSecret1); DistributionStatusNotification distributionStatusNotification = new DistributionStatusNotification(); @@ -126,20 +127,20 @@ public class CambriaHandler { String clientKey2 = "TAIEPO0aDU4VzM0G"; - CambriaConsumer createConsumer1 = new ConsumerBuilder().authenticatedBy("asdc1", "consumerId1").onTopic(topicName).usingHttps().usingHosts(servers).build(); + CambriaConsumer createConsumer1 = new ConsumerBuilder().authenticatedBy("asdc1", "consumerId1").onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build(); createConsumer1.setApiCredentials(clientKey1, "OTHk2mcCSbskEtHhDw8h5oUa"); createTopicManager.allowConsumer(topicName, clientKey1); CambriaConsumer createConsumer2 = null; if (true) { - createConsumer2 = new ConsumerBuilder().authenticatedBy("asdc2", "consumerId3").onTopic(topicName).usingHttps().usingHosts(servers).build(); + createConsumer2 = new ConsumerBuilder().authenticatedBy("asdc2", "consumerId3").onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build(); createConsumer2.setApiCredentials(clientKey2, "FSlNJbmGWWBvBLJetQMYxPP6"); createTopicManager.allowConsumer(topicName, clientKey2); } - createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(servers).build(); + createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build(); createSimplePublisher.setApiCredentials(key, secret); createTopicManager.allowProducer(topicName, key); @@ -284,7 +285,7 @@ public class CambriaHandler { CambriaTopicManager createTopicManager = null; try { - createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).build(); + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)); Set<String> topics = createTopicManager.getTopics(); @@ -318,7 +319,6 @@ public class CambriaHandler { * * set Cambria status and http code in case we succeed to fetch it * - * @param errorMessage * @return */ private CambriaErrorResponse processError(Exception e) { @@ -415,8 +415,6 @@ public class CambriaHandler { * - list of U-EB servers * @param apiKey * @param secretKey - * @param topicsList - * - list of exists topics * @param topicName * - topic to create * @param partitionCount @@ -428,7 +426,7 @@ public class CambriaHandler { CambriaTopicManager createTopicManager = null; try { - createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(apiKey, secretKey).build(); + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey)); createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount); @@ -458,7 +456,7 @@ public class CambriaHandler { public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) { CambriaTopicManager createTopicManager = null; try { - createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build(); + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey)); if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { createTopicManager.revokeProducer(topicName, subscriberApiKey); @@ -513,7 +511,7 @@ public class CambriaHandler { CambriaTopicManager createTopicManager = null; try { - createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build(); + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey)); if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { createTopicManager.allowProducer(topicName, subscriberApiKey); @@ -567,7 +565,7 @@ public class CambriaHandler { */ public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception { - CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps().usingHosts(hostSet).withSocketTimeout(timeoutMS).build(); + CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(hostSet).withSocketTimeout(timeoutMS).build(); consumer.setApiCredentials(apiKey, secretKey); return consumer; } @@ -638,7 +636,7 @@ public class CambriaHandler { String json = gson.toJson(data); logger.trace("Before sending notification data {} to topic {}", json, topicName); - createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build(); + createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build(); createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); int result = createSimplePublisher.send(PARTITION_KEY, json); @@ -700,7 +698,7 @@ public class CambriaHandler { String json = gson.toJson(data); logger.debug("Before sending notification data {} to topic {}", json, topicName); - createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build(); + createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build(); createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); int result = createSimplePublisher.send(PARTITION_KEY, json); @@ -761,7 +759,7 @@ public class CambriaHandler { hostSet.add(server); CambriaIdentityManager createIdentityManager = null; try { - createIdentityManager = new IdentityManagerBuilder().usingHttps().usingHosts(hostSet).build(); + createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet)); createIdentityManager.getApiKey(apiKey); response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); @@ -775,4 +773,10 @@ public class CambriaHandler { return response; } + private static <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException { + if (useHttpsWithDmaap) { + client.usingHttps(); + } + return (T)client.build(); + } } |