summaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java62
1 files changed, 46 insertions, 16 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
index 359330b81d..3f8abccb21 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
@@ -23,13 +23,19 @@ package org.openecomp.sdc.be.components.distribution.engine;
import com.att.nsa.apiClient.credentials.ApiCredential;
import com.att.nsa.apiClient.http.HttpException;
import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
-import com.att.nsa.cambria.client.*;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClient;
import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.AbstractAuthenticatedManagerBuilder;
import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
+import com.att.nsa.cambria.client.CambriaConsumer;
+import com.att.nsa.cambria.client.CambriaIdentityManager;
import com.att.nsa.cambria.client.CambriaPublisher.message;
+import com.att.nsa.cambria.client.CambriaTopicManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import fj.data.Either;
@@ -53,7 +59,7 @@ import java.util.regex.Pattern;
import static java.util.concurrent.TimeUnit.SECONDS;
@Component("cambriaHandler")
-public class CambriaHandler {
+public class CambriaHandler implements ICambriaHandler{
private static final Logger log = Logger.getLogger(CambriaHandler.class.getName());
private static final String PARTITION_KEY = "asdc" + "aa";
@@ -62,6 +68,9 @@ public class CambriaHandler {
.getDistributionEngineConfiguration()
.getDistributionStatusTopic()
.getConsumerId();
+ private static final boolean USE_HTTPS_WITH_DMAAP = ConfigurationManager.getConfigurationManager()
+ .getDistributionEngineConfiguration()
+ .isUseHttpsWithDmaap();
private final Gson gson = new Gson();
@@ -119,12 +128,13 @@ public class CambriaHandler {
* @param hostSet
* @return
*/
+ @Override
public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet));
+ createTopicManager = buildCambriaClient(createTopicManagerBuilder(hostSet));
Set<String> topics = createTopicManager.getTopics();
@@ -251,14 +261,15 @@ public class CambriaHandler {
* @param replicationCount
* @return
*/
+ @Override
public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)
- .authenticatedBy(apiKey, secretKey));
-
+ AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, apiKey, secretKey);
+ createTopicManager = buildCambriaClient(clientBuilder);
+
createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
}
@@ -282,13 +293,14 @@ public class CambriaHandler {
return new CambriaErrorResponse(CambriaOperationStatus.OK);
}
-
+ @Override
public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
String methodName = "unRegisterFromTopic";
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)
- .authenticatedBy(managerApiKey, managerSecretKey));
+ AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, managerApiKey, managerSecretKey);
+
+ createTopicManager = buildCambriaClient(clientBuilder);
if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
createTopicManager.revokeProducer(topicName, subscriberApiKey);
@@ -324,6 +336,20 @@ public class CambriaHandler {
return new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
}
+ private AbstractAuthenticatedManagerBuilder<CambriaTopicManager> createTopicManagerBuilder(Collection<String> hostSet, String managerApiKey, String managerSecretKey) {
+ AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet)
+ .authenticatedBy(managerApiKey, managerSecretKey);
+ if (USE_HTTPS_WITH_DMAAP) {
+ clientBuilder = clientBuilder.usingHttps();
+ }
+
+ return clientBuilder;
+ }
+
+ private AbstractAuthenticatedManagerBuilder<CambriaTopicManager> createTopicManagerBuilder(Collection<String> hostSet) {
+ return new TopicManagerBuilder().usingHosts(hostSet);
+ }
+
/**
* register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
*
@@ -335,13 +361,14 @@ public class CambriaHandler {
* @param topicName
* @return
*/
+ @Override
public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
String methodName = "registerToTopic";
CambriaTopicManager createTopicManager = null;
try {
- createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)
- .authenticatedBy(managerApiKey, managerSecretKey));
+ AbstractAuthenticatedManagerBuilder<CambriaTopicManager> clientBuilder = createTopicManagerBuilder(hostSet, managerApiKey, managerSecretKey);
+ createTopicManager = buildCambriaClient(clientBuilder);
if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
createTopicManager.allowProducer(topicName, subscriberApiKey);
@@ -392,6 +419,7 @@ public class CambriaHandler {
* @return
* @throws Exception
*/
+ @Override
public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey)
@@ -418,6 +446,7 @@ public class CambriaHandler {
* @param topicConsumer
* @return
*/
+ @Override
public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
String methodName = "fetchFromTopic";
@@ -454,6 +483,7 @@ public class CambriaHandler {
* @param data
* @return
*/
+ @Override
public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
CambriaBatchingPublisher createSimplePublisher = null;
@@ -497,7 +527,7 @@ public class CambriaHandler {
}
}
}
-
+ @Override
public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
String methodName = "sendNotificationAndClose";
CambriaBatchingPublisher createSimplePublisher;
@@ -564,7 +594,7 @@ public class CambriaHandler {
return response;
}
-
+ @Override
public CambriaErrorResponse getApiKey(String server, String apiKey) {
CambriaErrorResponse response;
@@ -586,7 +616,7 @@ public class CambriaHandler {
return response;
}
-
+ @Override
public Either<ApiCredential, CambriaErrorResponse> createUebKeys(List<String> hostSet) {
Either<ApiCredential, CambriaErrorResponse> result;
@@ -610,7 +640,7 @@ public class CambriaHandler {
}
@VisibleForTesting
- <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException {
- return (T) client.build();
+ <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<T> client) throws MalformedURLException, GeneralSecurityException {
+ return client.build();
}
}