diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java | 62 |
1 files changed, 46 insertions, 16 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java index 359330b81d..3f8abccb21 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java @@ -23,13 +23,19 @@ package org.openecomp.sdc.be.components.distribution.engine; import com.att.nsa.apiClient.credentials.ApiCredential; import com.att.nsa.apiClient.http.HttpException; import com.att.nsa.apiClient.http.HttpObjectNotFoundException; -import com.att.nsa.cambria.client.*; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClient; import com.att.nsa.cambria.client.CambriaClient.CambriaApiException; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.AbstractAuthenticatedManagerBuilder; import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder; import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder; +import com.att.nsa.cambria.client.CambriaConsumer; +import com.att.nsa.cambria.client.CambriaIdentityManager; import com.att.nsa.cambria.client.CambriaPublisher.message; +import com.att.nsa.cambria.client.CambriaTopicManager; import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import fj.data.Either; @@ -53,7 +59,7 @@ import java.util.regex.Pattern; import static java.util.concurrent.TimeUnit.SECONDS; @Component("cambriaHandler") -public class CambriaHandler { +public class CambriaHandler implements ICambriaHandler{ private static final Logger log = Logger.getLogger(CambriaHandler.class.getName()); private static final String PARTITION_KEY = "asdc" + "aa"; @@ -62,6 +68,9 @@ public class CambriaHandler { .getDistributionEngineConfiguration() .getDistributionStatusTopic() .getConsumerId(); + private static final boolean USE_HTTPS_WITH_DMAAP = ConfigurationManager.getConfigurationManager() + .getDistributionEngineConfiguration() + .isUseHttpsWithDmaap(); private final Gson gson = new Gson(); @@ -119,12 +128,13 @@ public class CambriaHandler { * @param hostSet * @return */ + @Override public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) { CambriaTopicManager createTopicManager = null; try { - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)); + createTopicManager = buildCambriaClient(createTopicManagerBuilder(hostSet)); Set<String> topics = createTopicManager.getTopics(); @@ -251,14 +261,15 @@ public class CambriaHandler { * @param replicationCount * @return */ + @Override public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) { CambriaTopicManager createTopicManager = null; try { - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet) - .authenticatedBy(apiKey, secretKey)); - + AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, apiKey, secretKey); + createTopicManager = buildCambriaClient(clientBuilder); + createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount); } @@ -282,13 +293,14 @@ public class CambriaHandler { return new CambriaErrorResponse(CambriaOperationStatus.OK); } - + @Override public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) { String methodName = "unRegisterFromTopic"; CambriaTopicManager createTopicManager = null; try { - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet) - .authenticatedBy(managerApiKey, managerSecretKey)); + AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, managerApiKey, managerSecretKey); + + createTopicManager = buildCambriaClient(clientBuilder); if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { createTopicManager.revokeProducer(topicName, subscriberApiKey); @@ -324,6 +336,20 @@ public class CambriaHandler { return new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK); } + private AbstractAuthenticatedManagerBuilder<CambriaTopicManager> createTopicManagerBuilder(Collection<String> hostSet, String managerApiKey, String managerSecretKey) { + AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet) + .authenticatedBy(managerApiKey, managerSecretKey); + if (USE_HTTPS_WITH_DMAAP) { + clientBuilder = clientBuilder.usingHttps(); + } + + return clientBuilder; + } + + private AbstractAuthenticatedManagerBuilder<CambriaTopicManager> createTopicManagerBuilder(Collection<String> hostSet) { + return new TopicManagerBuilder().usingHosts(hostSet); + } + /** * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER * @@ -335,13 +361,14 @@ public class CambriaHandler { * @param topicName * @return */ + @Override public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) { String methodName = "registerToTopic"; CambriaTopicManager createTopicManager = null; try { - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet) - .authenticatedBy(managerApiKey, managerSecretKey)); + AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, managerApiKey, managerSecretKey); + createTopicManager = buildCambriaClient(clientBuilder); if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { createTopicManager.allowProducer(topicName, subscriberApiKey); @@ -392,6 +419,7 @@ public class CambriaHandler { * @return * @throws Exception */ + @Override public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception { CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey) @@ -418,6 +446,7 @@ public class CambriaHandler { * @param topicConsumer * @return */ + @Override public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) { String methodName = "fetchFromTopic"; @@ -454,6 +483,7 @@ public class CambriaHandler { * @param data * @return */ + @Override public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) { CambriaBatchingPublisher createSimplePublisher = null; @@ -497,7 +527,7 @@ public class CambriaHandler { } } } - + @Override public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) { String methodName = "sendNotificationAndClose"; CambriaBatchingPublisher createSimplePublisher; @@ -564,7 +594,7 @@ public class CambriaHandler { return response; } - + @Override public CambriaErrorResponse getApiKey(String server, String apiKey) { CambriaErrorResponse response; @@ -586,7 +616,7 @@ public class CambriaHandler { return response; } - + @Override public Either<ApiCredential, CambriaErrorResponse> createUebKeys(List<String> hostSet) { Either<ApiCredential, CambriaErrorResponse> result; @@ -610,7 +640,7 @@ public class CambriaHandler { } @VisibleForTesting - <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException { - return (T) client.build(); + <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<T> client) throws MalformedURLException, GeneralSecurityException { + return client.build(); } } |