summaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java46
1 files changed, 25 insertions, 21 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
index 6738b87853..e6d15b8d5e 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
@@ -21,6 +21,7 @@
package org.openecomp.sdc.be.components.distribution.engine;
import java.io.IOException;
+import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
@@ -31,8 +32,10 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.att.nsa.cambria.client.*;
import org.apache.http.HttpStatus;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
+import org.openecomp.sdc.be.config.ConfigurationManager;
import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
import org.openecomp.sdc.common.config.EcompErrorName;
import org.slf4j.Logger;
@@ -40,16 +43,12 @@ import org.slf4j.LoggerFactory;
import com.att.nsa.apiClient.http.HttpException;
import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.att.nsa.cambria.client.CambriaIdentityManager;
import com.att.nsa.cambria.client.CambriaPublisher.message;
-import com.att.nsa.cambria.client.CambriaTopicManager;
import com.google.gson.Gson;
import fj.data.Either;
@@ -63,6 +62,8 @@ public class CambriaHandler {
private Gson gson = new Gson();
+ public static boolean useHttpsWithDmaap = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().isUseHttpsWithDmaap();
+
public static void main(String[] args) {
// String userBodyJson ="{\"artifactName\":\"myartifact\",
@@ -86,18 +87,18 @@ public class CambriaHandler {
String key = "sSJc5qiBnKy2qrlc";
String secret = "4ZRPzNJfEUK0sSNBvccd2m7X";
- createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build();
+ createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(servers).authenticatedBy(key, secret));
String topicName = "ASDC-DISTR-NOTIF-TOPIC-PRODesofer";
String clientKey1 = "CGGoorrGPXPx2B1C";
String clientSecret1 = "OTHk2mcCSbskEtHhDw8h5oUa";
- CambriaTopicManager createStatusTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build();
+ CambriaTopicManager createStatusTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(servers).authenticatedBy(key, secret));
String reportTopic = "ASDC-DISTR-STATUS-TOPIC-PRODESOFER";
createStatusTopicManager.allowProducer(reportTopic, clientKey1);
- CambriaBatchingPublisher createSimplePublisher = new PublisherBuilder().onTopic(reportTopic).usingHttps().usingHosts(servers).build();
+ CambriaBatchingPublisher createSimplePublisher = new PublisherBuilder().onTopic(reportTopic).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
createSimplePublisher.setApiCredentials(clientKey1, clientSecret1);
DistributionStatusNotification distributionStatusNotification = new DistributionStatusNotification();
@@ -126,20 +127,20 @@ public class CambriaHandler {
String clientKey2 = "TAIEPO0aDU4VzM0G";
- CambriaConsumer createConsumer1 = new ConsumerBuilder().authenticatedBy("asdc1", "consumerId1").onTopic(topicName).usingHttps().usingHosts(servers).build();
+ CambriaConsumer createConsumer1 = new ConsumerBuilder().authenticatedBy("asdc1", "consumerId1").onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
createConsumer1.setApiCredentials(clientKey1, "OTHk2mcCSbskEtHhDw8h5oUa");
createTopicManager.allowConsumer(topicName, clientKey1);
CambriaConsumer createConsumer2 = null;
if (true) {
- createConsumer2 = new ConsumerBuilder().authenticatedBy("asdc2", "consumerId3").onTopic(topicName).usingHttps().usingHosts(servers).build();
+ createConsumer2 = new ConsumerBuilder().authenticatedBy("asdc2", "consumerId3").onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
createConsumer2.setApiCredentials(clientKey2, "FSlNJbmGWWBvBLJetQMYxPP6");
createTopicManager.allowConsumer(topicName, clientKey2);
}
- createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(servers).build();
+ createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(servers).build();
createSimplePublisher.setApiCredentials(key, secret);
createTopicManager.allowProducer(topicName, key);
@@ -284,7 +285,7 @@ public class CambriaHandler {
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).build();
+ createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet));
Set<String> topics = createTopicManager.getTopics();
@@ -318,7 +319,6 @@ public class CambriaHandler {
*
* set Cambria status and http code in case we succeed to fetch it
*
- * @param errorMessage
* @return
*/
private CambriaErrorResponse processError(Exception e) {
@@ -415,8 +415,6 @@ public class CambriaHandler {
* - list of U-EB servers
* @param apiKey
* @param secretKey
- * @param topicsList
- * - list of exists topics
* @param topicName
* - topic to create
* @param partitionCount
@@ -428,7 +426,7 @@ public class CambriaHandler {
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(apiKey, secretKey).build();
+ createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey));
createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
@@ -458,7 +456,7 @@ public class CambriaHandler {
public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
+ createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
createTopicManager.revokeProducer(topicName, subscriberApiKey);
@@ -513,7 +511,7 @@ public class CambriaHandler {
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
+ createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
createTopicManager.allowProducer(topicName, subscriberApiKey);
@@ -567,7 +565,7 @@ public class CambriaHandler {
*/
public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
- CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps().usingHosts(hostSet).withSocketTimeout(timeoutMS).build();
+ CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(hostSet).withSocketTimeout(timeoutMS).build();
consumer.setApiCredentials(apiKey, secretKey);
return consumer;
}
@@ -638,7 +636,7 @@ public class CambriaHandler {
String json = gson.toJson(data);
logger.trace("Before sending notification data {} to topic {}", json, topicName);
- createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build();
+ createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build();
createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
int result = createSimplePublisher.send(PARTITION_KEY, json);
@@ -700,7 +698,7 @@ public class CambriaHandler {
String json = gson.toJson(data);
logger.debug("Before sending notification data {} to topic {}", json, topicName);
- createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build();
+ createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build();
createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
int result = createSimplePublisher.send(PARTITION_KEY, json);
@@ -761,7 +759,7 @@ public class CambriaHandler {
hostSet.add(server);
CambriaIdentityManager createIdentityManager = null;
try {
- createIdentityManager = new IdentityManagerBuilder().usingHttps().usingHosts(hostSet).build();
+ createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
createIdentityManager.getApiKey(apiKey);
response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
@@ -775,4 +773,10 @@ public class CambriaHandler {
return response;
}
+ private static <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException {
+ if (useHttpsWithDmaap) {
+ client.usingHttps();
+ }
+ return (T)client.build();
+ }
}