diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java | 192 |
1 files changed, 180 insertions, 12 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java index 3528ed9be3..6738b87853 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java @@ -21,7 +21,6 @@ package org.openecomp.sdc.be.components.distribution.engine; import java.io.IOException; -import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Collection; @@ -54,6 +53,7 @@ import com.att.nsa.cambria.client.CambriaTopicManager; import com.google.gson.Gson; import fj.data.Either; +import jline.internal.Log; public class CambriaHandler { @@ -63,6 +63,169 @@ public class CambriaHandler { private Gson gson = new Gson(); + public static void main(String[] args) { + + // String userBodyJson ="{\"artifactName\":\"myartifact\", + // \"artifactType\":\"MURANO-PKG\", + // \"artifactDescription\":\"description\", + // \"payloadData\":\"UEsDBAoAAAAIAAeLb0bDQz\", \"Content-MD5\": + // \"YTg2Mjg4MWJhNmI5NzBiNzdDFkMWI=\" }"; + // System.out.println(userBodyJson); + // String encodeBase64Str = GeneralUtililty.calculateMD5 (userBodyJson); + // System.out.println(encodeBase64Str); + + CambriaTopicManager createTopicManager = null; + try { + List<String> servers = new ArrayList<String>(); + // servers.add("uebsb91kcdc.it.sdc.com:3904"); + // servers.add("uebsb92kcdc.it.sdc.com:3904"); + // servers.add("uebsb93kcdc.it.sdc.com:3904"); + servers.add("uebsb91sfdc.it.att.com:3904"); + servers.add("uebsb92sfdc.it.att.com:3904"); + + String key = "sSJc5qiBnKy2qrlc"; + String secret = "4ZRPzNJfEUK0sSNBvccd2m7X"; + + createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build(); + + String topicName = "ASDC-DISTR-NOTIF-TOPIC-PRODesofer"; + + String clientKey1 = "CGGoorrGPXPx2B1C"; + String clientSecret1 = "OTHk2mcCSbskEtHhDw8h5oUa"; + + CambriaTopicManager createStatusTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(servers).authenticatedBy(key, secret).build(); + String reportTopic = "ASDC-DISTR-STATUS-TOPIC-PRODESOFER"; + createStatusTopicManager.allowProducer(reportTopic, clientKey1); + + CambriaBatchingPublisher createSimplePublisher = new PublisherBuilder().onTopic(reportTopic).usingHttps().usingHosts(servers).build(); + createSimplePublisher.setApiCredentials(clientKey1, clientSecret1); + + DistributionStatusNotification distributionStatusNotification = new DistributionStatusNotification(); + distributionStatusNotification.setStatus(DistributionStatusNotificationEnum.DEPLOY_OK); + distributionStatusNotification.setArtifactURL("Ssssssss url"); + distributionStatusNotification.setDistributionID("idddddddddddddd"); + distributionStatusNotification.setTimestamp(System.currentTimeMillis()); + distributionStatusNotification.setConsumerID("my consumer id"); + + Gson gson = new Gson(); + int result = createSimplePublisher.send(PARTITION_KEY, gson.toJson(distributionStatusNotification)); + + List<message> messagesInQ = createSimplePublisher.close(20, TimeUnit.SECONDS); + System.out.println(messagesInQ == null ? 0 : messagesInQ.size()); + + // createTopicManager.createTopic(topicName, "my test topic", 1, 1); + + /* + * + * { "secret": "OTHk2mcCSbskEtHhDw8h5oUa", "aux": { "email": "esofer@intl.sdc.com", "description": "test-keys" }, "key": "CGGoorrGPXPx2B1C" } + * + * + * { "secret": "FSlNJbmGWWBvBLJetQMYxPP6", "aux": { "email": "esofer@intl.sdc.com", "description": "test-keys" }, "key": "TAIEPO0aDU4VzM0G" } + * + */ + + String clientKey2 = "TAIEPO0aDU4VzM0G"; + + CambriaConsumer createConsumer1 = new ConsumerBuilder().authenticatedBy("asdc1", "consumerId1").onTopic(topicName).usingHttps().usingHosts(servers).build(); + createConsumer1.setApiCredentials(clientKey1, "OTHk2mcCSbskEtHhDw8h5oUa"); + + createTopicManager.allowConsumer(topicName, clientKey1); + + CambriaConsumer createConsumer2 = null; + if (true) { + createConsumer2 = new ConsumerBuilder().authenticatedBy("asdc2", "consumerId3").onTopic(topicName).usingHttps().usingHosts(servers).build(); + createConsumer2.setApiCredentials(clientKey2, "FSlNJbmGWWBvBLJetQMYxPP6"); + + createTopicManager.allowConsumer(topicName, clientKey2); + } + + createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(servers).build(); + createSimplePublisher.setApiCredentials(key, secret); + createTopicManager.allowProducer(topicName, key); + + createSimplePublisher.send("aaaa", "{ my testttttttttttttttt }"); + + while (true) { + + Iterable<String> fetch1 = createConsumer1.fetch(); + + Iterator<String> iterator1 = fetch1.iterator(); + while (iterator1.hasNext()) { + System.out.println("***********************************************"); + System.out.println("client 1" + iterator1.next()); + System.out.println("***********************************************"); + } + + if (createConsumer2 != null) { + Iterable<String> fetch2 = createConsumer2.fetch(); + + Iterator<String> iterator2 = fetch2.iterator(); + while (iterator2.hasNext()) { + System.out.println("***********************************************"); + System.out.println("client 2" + iterator2.next()); + System.out.println("***********************************************"); + } + } + Thread.sleep(1000 * 20); + } + + // createTopicManager = CambriaClientFactory.createTopicManager( + // servers, "8F3MDAtMSBwwpSMy", "gzFmsTxSCtO5RQfAccM6PqqX"); + + // createTopicManager.deleteTopic("ASDC-DISTR-NOTIF-TOPIC-PROD"); + // createTopicManager.deleteTopic("ASDC-DISTR-NOTIF-TOPIC-PROD1"); + + // CambriaIdentityManager createIdentityManager = + // CambriaClientFactory.createIdentityManager(null, null, null); + // createIdentityManager.setApiCredentials(arg0, arg1); + // createIdentityManager.cl + + // String topicName = " "; + // createTopicManager.createTopic(topicName, + // "ASDC distribution notification topic", 1, 1); + // + // Thread.sleep(10 * 1000); + // + // for (int i = 0; i < 5; i++) { + // try { + // boolean openForProducing = createTopicManager + // .isOpenForProducing(topicName); + // + // System.out.println("openForProducing=" + openForProducing); + // createTopicManager.allowProducer(topicName, + // "8F3MDAtMSBwwpSMy"); + // Set<String> allowedProducers = createTopicManager + // .getAllowedProducers(topicName); + // System.out.println(allowedProducers); + // + // } catch (Exception e) { + // e.printStackTrace(); + // } + // } + + // createTopicManager.createTopic("", "", 0, 0); + // createTopicManager.allowProducer(arg0, arg1); + // createTopicManager.getTopics(); + // createTopicManager.close(); + // CambriaClientFactory. + // CambriaBatchingPublisher createSimplePublisher = + // CambriaClientFactory.createSimplePublisher("hostlist", "topic"); + + // CambriaIdentityManager createIdentityManager = + // CambriaClientFactory.createIdentityManager(null, "apiKey", + // "apiSecret"); + // createIdentityManager. + + } catch (Exception e) { + Log.debug("Exception in main test of Cambria Handler: {}", e.getMessage(), e); + e.printStackTrace(); + } finally { + if (createTopicManager != null) { + createTopicManager.close(); + } + } + } + /** * process the response error from Cambria client * @@ -121,7 +284,8 @@ public class CambriaHandler { CambriaTopicManager createTopicManager = null; try { - createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).build(); + createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).build(); + Set<String> topics = createTopicManager.getTopics(); if (topics == null || true == topics.isEmpty()) { @@ -263,10 +427,13 @@ public class CambriaHandler { CambriaTopicManager createTopicManager = null; try { - createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey).build(); + + createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(apiKey, secretKey).build(); + createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount); - } catch (GeneralSecurityException | HttpException | IOException e) { + } catch (HttpException | IOException | GeneralSecurityException e) { + logger.debug("Failed to create topic {}", topicName, e); String methodName = new Object() { }.getClass().getEnclosingMethod().getName(); @@ -285,12 +452,13 @@ public class CambriaHandler { } } return new CambriaErrorResponse(CambriaOperationStatus.OK); + } public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) { CambriaTopicManager createTopicManager = null; try { - createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build(); + createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build(); if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { createTopicManager.revokeProducer(topicName, subscriberApiKey); @@ -345,7 +513,7 @@ public class CambriaHandler { CambriaTopicManager createTopicManager = null; try { - createTopicManager = new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build(); + createTopicManager = new TopicManagerBuilder().usingHttps().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey).build(); if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { createTopicManager.allowProducer(topicName, subscriberApiKey); @@ -395,11 +563,11 @@ public class CambriaHandler { * @param consumerGroup * @param timeoutMS * @return - * @throws Exception + * @throws Exception */ public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception { - CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHosts(hostSet).withSocketTimeout(timeoutMS).build(); + CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps().usingHosts(hostSet).withSocketTimeout(timeoutMS).build(); consumer.setApiCredentials(apiKey, secretKey); return consumer; } @@ -470,7 +638,7 @@ public class CambriaHandler { String json = gson.toJson(data); logger.trace("Before sending notification data {} to topic {}", json, topicName); - createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build(); + createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build(); createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); int result = createSimplePublisher.send(PARTITION_KEY, json); @@ -532,7 +700,7 @@ public class CambriaHandler { String json = gson.toJson(data); logger.debug("Before sending notification data {} to topic {}", json, topicName); - createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build(); + createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps().usingHosts(uebServers).build(); createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); int result = createSimplePublisher.send(PARTITION_KEY, json); @@ -559,7 +727,7 @@ public class CambriaHandler { } - logger.debug("Before closing publisher. Maximum timeout is {} seconds.", waitBeforeCloseTimeout); + logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout); try { List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS); if (messagesInQ != null && false == messagesInQ.isEmpty()) { @@ -593,7 +761,7 @@ public class CambriaHandler { hostSet.add(server); CambriaIdentityManager createIdentityManager = null; try { - createIdentityManager = new IdentityManagerBuilder().usingHosts(hostSet).build(); + createIdentityManager = new IdentityManagerBuilder().usingHttps().usingHosts(hostSet).build(); createIdentityManager.getApiKey(apiKey); response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); |