summaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java192
1 files changed, 180 insertions, 12 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
index 3528ed9be3..6738b87853 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
@@ -21,7 +21,6 @@
package org.openecomp.sdc.be.components.distribution.engine;
import java.io.IOException;
-import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
@@ -54,6 +53,7 @@ import com.att.nsa.cambria.client.CambriaTopicManager;
import com.google.gson.Gson;
import fj.data.Either;
+import jline.internal.Log;
public class CambriaHandler {
@@ -63,6 +63,169 @@ public class CambriaHandler {
private Gson gson = new Gson();
+ public static void main(String[] args) {
+
+ // String userBodyJson ="{\"artifactName\":\"myartifact\",
+ // \"artifactType\":\"MURANO-PKG\",
+ // \"artifactDescription\":\"description\",
+ // \"payloadData\":\"UEsDBAoAAAAIAAeLb0bDQz\", \"Content-MD5\":
+ // \"YTg2Mjg4MWJhNmI5NzBiNzdDFkMWI=\" }";
+ // System.out.println(userBodyJson);
+ // String encodeBase64Str = GeneralUtililty.calculateMD5 (userBodyJson);
+ // System.out.println(encodeBase64Str);
+
+ CambriaTopicManager createTopicManager = null;
+ try {
+ List<String> servers = new ArrayList<String>();
+ // servers.add("uebsb91kcdc.it.sdc.com:3904");
+ // servers.add("uebsb92kcdc.it.sdc.com:3904");
+ // servers.add("uebsb93kcdc.it.sdc.com:3904");
+ servers.add("uebsb91sfdc.it.att.com:3904");
+ servers.add("uebsb92sfdc.it.att.com:3904");
+
+ String key = "sSJc5qiBnKy2qrlc";
+ String secret = "4ZRPzNJfEUK0sSNBvccd2m7X";
+
+ createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build();
+
+ String topicName = "ASDC-DISTR-NOTIF-TOPIC-PRODesofer";
+
+ String clientKey1 = "CGGoorrGPXPx2B1C";
+ String clientSecret1 = "OTHk2mcCSbskEtHhDw8h5oUa";
+
+ CambriaTopicManager createStatusTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build();
+ String reportTopic = "ASDC-DISTR-STATUS-TOPIC-PRODESOFER";
+ createStatusTopicManager.allowProducer(reportTopic, clientKey1);
+
+ CambriaBatchingPublisher createSimplePublisher = new PublisherBuilder().onTopic(reportTopic).usingHttps().usingHosts(servers).build();
+ createSimplePublisher.setApiCredentials(clientKey1, clientSecret1);
+
+ DistributionStatusNotification distributionStatusNotification = new DistributionStatusNotification();
+ distributionStatusNotification.setStatus(DistributionStatusNotificationEnum.DEPLOY_OK);
+ distributionStatusNotification.setArtifactURL("Ssssssss url");
+ distributionStatusNotification.setDistributionID("idddddddddddddd");
+ distributionStatusNotification.setTimestamp(System.currentTimeMillis());
+ distributionStatusNotification.setConsumerID("my consumer id");
+
+ Gson gson = new Gson();
+ int result = createSimplePublisher.send(PARTITION_KEY, gson.toJson(distributionStatusNotification));
+
+ List<message> messagesInQ = createSimplePublisher.close(20, TimeUnit.SECONDS);
+ System.out.println(messagesInQ == null ? 0 : messagesInQ.size());
+
+ // createTopicManager.createTopic(topicName, "my test topic", 1, 1);
+
+ /*
+ *
+ * { "secret": "OTHk2mcCSbskEtHhDw8h5oUa", "aux": { "email": "esofer@intl.sdc.com", "description": "test-keys" }, "key": "CGGoorrGPXPx2B1C" }
+ *
+ *
+ * { "secret": "FSlNJbmGWWBvBLJetQMYxPP6", "aux": { "email": "esofer@intl.sdc.com", "description": "test-keys" }, "key": "TAIEPO0aDU4VzM0G" }
+ *
+ */
+
+ String clientKey2 = "TAIEPO0aDU4VzM0G";
+
+ CambriaConsumer createConsumer1 = new ConsumerBuilder().authenticatedBy("asdc1", "consumerId1").onTopic(topicName).usingHttps().usingHosts(servers).build();
+ createConsumer1.setApiCredentials(clientKey1, "OTHk2mcCSbskEtHhDw8h5oUa");
+
+ createTopicManager.allowConsumer(topicName, clientKey1);
+
+ CambriaConsumer createConsumer2 = null;
+ if (true) {
+ createConsumer2 = new ConsumerBuilder().authenticatedBy("asdc2", "consumerId3").onTopic(topicName).usingHttps().usingHosts(servers).build();
+ createConsumer2.setApiCredentials(clientKey2, "FSlNJbmGWWBvBLJetQMYxPP6");
+
+ createTopicManager.allowConsumer(topicName, clientKey2);
+ }
+
+ createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(servers).build();
+ createSimplePublisher.setApiCredentials(key, secret);
+ createTopicManager.allowProducer(topicName, key);
+
+ createSimplePublisher.send("aaaa", "{ my testttttttttttttttt }");
+
+ while (true) {
+
+ Iterable<String> fetch1 = createConsumer1.fetch();
+
+ Iterator<String> iterator1 = fetch1.iterator();
+ while (iterator1.hasNext()) {
+ System.out.println("***********************************************");
+ System.out.println("client 1" + iterator1.next());
+ System.out.println("***********************************************");
+ }
+
+ if (createConsumer2 != null) {
+ Iterable<String> fetch2 = createConsumer2.fetch();
+
+ Iterator<String> iterator2 = fetch2.iterator();
+ while (iterator2.hasNext()) {
+ System.out.println("***********************************************");
+ System.out.println("client 2" + iterator2.next());
+ System.out.println("***********************************************");
+ }
+ }
+ Thread.sleep(1000 * 20);
+ }
+
+ // createTopicManager = CambriaClientFactory.createTopicManager(
+ // servers, "8F3MDAtMSBwwpSMy", "gzFmsTxSCtO5RQfAccM6PqqX");
+
+ // createTopicManager.deleteTopic("ASDC-DISTR-NOTIF-TOPIC-PROD");
+ // createTopicManager.deleteTopic("ASDC-DISTR-NOTIF-TOPIC-PROD1");
+
+ // CambriaIdentityManager createIdentityManager =
+ // CambriaClientFactory.createIdentityManager(null, null, null);
+ // createIdentityManager.setApiCredentials(arg0, arg1);
+ // createIdentityManager.cl
+
+ // String topicName = " ";
+ // createTopicManager.createTopic(topicName,
+ // "ASDC distribution notification topic", 1, 1);
+ //
+ // Thread.sleep(10 * 1000);
+ //
+ // for (int i = 0; i < 5; i++) {
+ // try {
+ // boolean openForProducing = createTopicManager
+ // .isOpenForProducing(topicName);
+ //
+ // System.out.println("openForProducing=" + openForProducing);
+ // createTopicManager.allowProducer(topicName,
+ // "8F3MDAtMSBwwpSMy");
+ // Set<String> allowedProducers = createTopicManager
+ // .getAllowedProducers(topicName);
+ // System.out.println(allowedProducers);
+ //
+ // } catch (Exception e) {
+ // e.printStackTrace();
+ // }
+ // }
+
+ // createTopicManager.createTopic("", "", 0, 0);
+ // createTopicManager.allowProducer(arg0, arg1);
+ // createTopicManager.getTopics();
+ // createTopicManager.close();
+ // CambriaClientFactory.
+ // CambriaBatchingPublisher createSimplePublisher =
+ // CambriaClientFactory.createSimplePublisher("hostlist", "topic");
+
+ // CambriaIdentityManager createIdentityManager =
+ // CambriaClientFactory.createIdentityManager(null, "apiKey",
+ // "apiSecret");
+ // createIdentityManager.
+
+ } catch (Exception e) {
+ Log.debug("Exception in main test of Cambria Handler: {}", e.getMessage(), e);
+ e.printStackTrace();
+ } finally {
+ if (createTopicManager != null) {
+ createTopicManager.close();
+ }
+ }
+ }
+
/**
* process the response error from Cambria client
*
@@ -121,7 +284,8 @@ public class CambriaHandler {
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).build();
+ createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).build();
+
Set<String> topics = createTopicManager.getTopics();
if (topics == null || true == topics.isEmpty()) {
@@ -263,10 +427,13 @@ public class CambriaHandler {
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey).build();
+
+ createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(apiKey, secretKey).build();
+
createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
- } catch (GeneralSecurityException | HttpException | IOException e) {
+ } catch (HttpException | IOException | GeneralSecurityException e) {
+
logger.debug("Failed to create topic {}", topicName, e);
String methodName = new Object() {
}.getClass().getEnclosingMethod().getName();
@@ -285,12 +452,13 @@ public class CambriaHandler {
}
}
return new CambriaErrorResponse(CambriaOperationStatus.OK);
+
}
public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
+ createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
createTopicManager.revokeProducer(topicName, subscriberApiKey);
@@ -345,7 +513,7 @@ public class CambriaHandler {
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
+ createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build();
if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
createTopicManager.allowProducer(topicName, subscriberApiKey);
@@ -395,11 +563,11 @@ public class CambriaHandler {
* @param consumerGroup
* @param timeoutMS
* @return
- * @throws Exception
+ * @throws Exception
*/
public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
- CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHosts(hostSet).withSocketTimeout(timeoutMS).build();
+ CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps().usingHosts(hostSet).withSocketTimeout(timeoutMS).build();
consumer.setApiCredentials(apiKey, secretKey);
return consumer;
}
@@ -470,7 +638,7 @@ public class CambriaHandler {
String json = gson.toJson(data);
logger.trace("Before sending notification data {} to topic {}", json, topicName);
- createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
+ createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build();
createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
int result = createSimplePublisher.send(PARTITION_KEY, json);
@@ -532,7 +700,7 @@ public class CambriaHandler {
String json = gson.toJson(data);
logger.debug("Before sending notification data {} to topic {}", json, topicName);
- createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
+ createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build();
createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
int result = createSimplePublisher.send(PARTITION_KEY, json);
@@ -559,7 +727,7 @@ public class CambriaHandler {
}
- logger.debug("Before closing publisher. Maximum timeout is {} seconds.", waitBeforeCloseTimeout);
+ logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
try {
List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS);
if (messagesInQ != null && false == messagesInQ.isEmpty()) {
@@ -593,7 +761,7 @@ public class CambriaHandler {
hostSet.add(server);
CambriaIdentityManager createIdentityManager = null;
try {
- createIdentityManager = new IdentityManagerBuilder().usingHosts(hostSet).build();
+ createIdentityManager = new IdentityManagerBuilder().usingHttps().usingHosts(hostSet).build();
createIdentityManager.getApiKey(apiKey);
response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);