diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java | 1081 |
1 files changed, 540 insertions, 541 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java index c496715a02..d6fee9a9d3 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java @@ -20,25 +20,7 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.http.HttpStatus; -import org.openecomp.sdc.be.config.BeEcompErrorManager; -import org.openecomp.sdc.be.config.ConfigurationManager; -import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; -import org.openecomp.sdc.common.config.EcompErrorName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.att.nsa.apiClient.credentials.ApiCredential; import com.att.nsa.apiClient.http.HttpException; import com.att.nsa.apiClient.http.HttpObjectNotFoundException; import com.att.nsa.cambria.client.CambriaBatchingPublisher; @@ -53,574 +35,591 @@ import com.att.nsa.cambria.client.CambriaConsumer; import com.att.nsa.cambria.client.CambriaIdentityManager; import com.att.nsa.cambria.client.CambriaPublisher.message; import com.att.nsa.cambria.client.CambriaTopicManager; +import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; - import fj.data.Either; +import org.apache.http.HttpStatus; +import org.openecomp.sdc.be.config.BeEcompErrorManager; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@Component("cambriaHandler") public class CambriaHandler { - private static Logger logger = LoggerFactory.getLogger(CambriaHandler.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(CambriaHandler.class); + + private static final String PARTITION_KEY = "asdc" + "aa"; + + private final String SEND_NOTIFICATION = "send notification"; + + private Gson gson = new Gson(); + + private static final String CONSUMER_ID = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getDistributionStatusTopic().getConsumerId(); + + + + /** + * process the response error from Cambria client + * + * @param message + * @return + */ + private Integer processMessageException(String message) { + + String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" }; + + Integer result = checkPattern(patterns[0], message, 2); + if (result != null) { + return result; + } + result = checkPattern(patterns[1], message, 2); + + return result; + + } + + /** + * check whether the message has a match with a given pattern inside it + * + * @param patternStr + * @param message + * @param groupIndex + * @return + */ + private Integer checkPattern(String patternStr, String message, int groupIndex) { + Integer result = null; + + Pattern pattern = Pattern.compile(patternStr); + Matcher matcher = pattern.matcher(message); + boolean find = matcher.find(); + if (find) { + String httpCode = matcher.group(groupIndex); + if (httpCode != null) { + try { + result = Integer.valueOf(httpCode); + } catch (NumberFormatException e) { + logger.debug("Failed to parse http code {}", httpCode); + } + } + } + return result; + } + + /** + * retrieve all topics from U-EB server + * + * @param hostSet + * @return + */ + public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) { + + CambriaTopicManager createTopicManager = null; + try { + + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)); + + Set<String> topics = createTopicManager.getTopics(); + + if (topics == null || true == topics.isEmpty()) { + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null); + return Either.right(cambriaErrorResponse); + } + + return Either.left(topics); + + } catch (IOException | GeneralSecurityException e) { + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + CambriaErrorResponse cambriaErrorResponse = processError(e); + + logger.debug("Failed to fetch topics from U-EB server", e); + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics"); + + return Either.right(cambriaErrorResponse); + } finally { + if (createTopicManager != null) { + createTopicManager.close(); + } + } + + } + + /** + * process the error message from Cambria client. + * + * set Cambria status and http code in case we succeed to fetch it + * + * @return + */ + private CambriaErrorResponse processError(Exception e) { + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(); + + Integer httpCode = processMessageException(e.getMessage()); + + if (httpCode != null) { + cambriaErrorResponse.setHttpCode(httpCode); + switch (httpCode.intValue()) { + + case 401: + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR); + break; + case 409: + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST); + break; + case 500: + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR); + break; + default: + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR); + } + } else { + + boolean found = false; + Throwable throwable = e.getCause(); + if (throwable != null) { + String message = throwable.getMessage(); + + Throwable cause = throwable.getCause(); + + if (cause != null) { + Class<?> clazz = cause.getClass(); + String className = clazz.getName(); + if (className.endsWith("UnknownHostException")) { + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR); + cambriaErrorResponse.addVariable(message); + found = true; + } + } + } + + if (false == found) { + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR); + cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR); + } + } + + return cambriaErrorResponse; + } + + /** + * write the error to the log + * + * @param cambriaErrorResponse + * @param errorMessage + * @param methodName + * @param operationDesc + */ + private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) { + + String httpCode = cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode()); + + switch (cambriaErrorResponse.getOperationStatus()) { + case UNKNOWN_HOST_ERROR: + String hostname = cambriaErrorResponse.getVariables().get(0); + BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode); + break; + case AUTHENTICATION_ERROR: + BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode); + break; + case CONNNECTION_ERROR: + BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode); + break; + + case INTERNAL_SERVER_ERROR: + BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc); + break; + default: + break; + } + + } + + /** + * create a topic if it does not exists in the topicsList + * + * @param hostSet + * - list of U-EB servers + * @param apiKey + * @param secretKey + * @param topicName + * - topic to create + * @param partitionCount + * @param replicationCount + * @return + */ + public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) { + + CambriaTopicManager createTopicManager = null; + try { + + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey)); + + createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount); + + } catch (HttpException | IOException | GeneralSecurityException e) { + + logger.debug("Failed to create topic {}", topicName, e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + CambriaErrorResponse cambriaErrorResponse = processError(e); + + if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) { + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic"); + } + + return cambriaErrorResponse; + + } finally { + if (createTopicManager != null) { + createTopicManager.close(); + } + } + return new CambriaErrorResponse(CambriaOperationStatus.OK); + + } + + public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) { + CambriaTopicManager createTopicManager = null; + try { + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey)); + + if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { + createTopicManager.revokeProducer(topicName, subscriberApiKey); + } else { + createTopicManager.revokeConsumer(topicName, subscriberApiKey); + } + + } catch (HttpObjectNotFoundException | GeneralSecurityException e) { + logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage()); + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND); + return cambriaErrorResponse; + + } catch (HttpException | IOException e) { + logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + CambriaErrorResponse cambriaErrorResponse = processError(e); + + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase()); + + return cambriaErrorResponse; + } finally { + if (createTopicManager != null) { + createTopicManager.close(); + } + } + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK); + return cambriaErrorResponse; + } + + /** + * + * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER + * + * @param hostSet + * @param managerApiKey + * @param managerSecretKey + * @param subscriberApiKey + * @param subscriberTypeEnum + * @param topicName + * @return + */ + public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) { + + CambriaTopicManager createTopicManager = null; + try { + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey)); + + if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { + createTopicManager.allowProducer(topicName, subscriberApiKey); + } else { + createTopicManager.allowConsumer(topicName, subscriberApiKey); + } + + } catch (HttpObjectNotFoundException | GeneralSecurityException e) { + logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage()); + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND); + return cambriaErrorResponse; + + } catch (HttpException | IOException e) { + logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + CambriaErrorResponse cambriaErrorResponse = processError(e); + + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase()); + + return cambriaErrorResponse; + } finally { + if (createTopicManager != null) { + createTopicManager.close(); + } + } + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK); + return cambriaErrorResponse; + } + + /** + * create and retrieve a Cambria Consumer for a specific topic + * + * @param hostSet + * @param topicName + * @param apiKey + * @param secretKey + * @param consumerId + * @param consumerGroup + * @param timeoutMS + * @return + * @throws Exception + */ + public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception { + + CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHosts(hostSet).waitAtServer(timeoutMS).build(); + consumer.setApiCredentials(apiKey, secretKey); + return consumer; + } + + public void closeConsumer(CambriaConsumer consumer) { + + if (consumer != null) { + consumer.close(); + } + + } + + /** + * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error) + * + * @param topicConsumer + * @return + */ + public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) { + + try { + Iterable<String> messages = topicConsumer.fetch(); + if (messages == null) { + messages = new ArrayList<String>(); + } + return Either.left(messages); + + } catch (IOException e) { + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + CambriaErrorResponse cambriaErrorResponse = processError(e); + + logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage()); + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic"); + + return Either.right(cambriaErrorResponse); + + } catch (Exception e) { + logger.debug("Failed to fetch from U-EB topic", e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage()); + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR); + return Either.right(cambriaErrorResponse); + } + } + + /** + * Publish notification message to a given queue + * + * @param topicName + * @param uebPublicKey + * @param uebSecretKey + * @param uebServers + * @param data + * @return + */ + public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) { + + CambriaBatchingPublisher createSimplePublisher = null; + + try { + + String json = gson.toJson(data); + logger.trace("Before sending notification data {} to topic {}", json, topicName); + + createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build(); + createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); - private static final String PARTITION_KEY = "asdc" + "aa"; - - private final String SEND_NOTIFICATION = "send notification"; - - private Gson gson = new Gson(); - - public static boolean useHttpsWithDmaap = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().isUseHttpsWithDmaap(); + int result = createSimplePublisher.send(PARTITION_KEY, json); + try { + Thread.sleep(1 * 1000); + } catch (InterruptedException e) { + logger.debug("Failed during sleep after sending the message.", e); + } - /** - * process the response error from Cambria client - * - * @param message - * @return - */ - private Integer processMessageException(String message) { - - String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" }; - - Integer result = checkPattern(patterns[0], message, 2); - if (result != null) { - return result; - } - result = checkPattern(patterns[1], message, 2); - - return result; - - } - - /** - * check whether the message has a match with a given pattern inside it - * - * @param patternStr - * @param message - * @param groupIndex - * @return - */ - private Integer checkPattern(String patternStr, String message, int groupIndex) { - Integer result = null; - - Pattern pattern = Pattern.compile(patternStr); - Matcher matcher = pattern.matcher(message); - boolean find = matcher.find(); - if (find) { - String httpCode = matcher.group(groupIndex); - if (httpCode != null) { - try { - result = Integer.valueOf(httpCode); - } catch (NumberFormatException e) { - logger.debug("Failed to parse http code {}", httpCode); - } - } - } - return result; - } - - /** - * retrieve all topics from U-EB server - * - * @param hostSet - * @return - */ - public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) { - - CambriaTopicManager createTopicManager = null; - try { - - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)); - - Set<String> topics = createTopicManager.getTopics(); - - if (topics == null || true == topics.isEmpty()) { - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null); - return Either.right(cambriaErrorResponse); - } - - return Either.left(topics); - - } catch (IOException | GeneralSecurityException e) { - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - CambriaErrorResponse cambriaErrorResponse = processError(e); - - logger.debug("Failed to fetch topics from U-EB server", e); - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics"); - - return Either.right(cambriaErrorResponse); - } finally { - if (createTopicManager != null) { - createTopicManager.close(); - } - } - - } - - /** - * process the error message from Cambria client. - * - * set Cambria status and http code in case we succeed to fetch it - * - * @return - */ - private CambriaErrorResponse processError(Exception e) { - - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(); - - Integer httpCode = processMessageException(e.getMessage()); - - if (httpCode != null) { - cambriaErrorResponse.setHttpCode(httpCode); - switch (httpCode.intValue()) { - - case 401: - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR); - break; - case 409: - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST); - break; - case 500: - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR); - break; - default: - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR); - } - } else { - - boolean found = false; - Throwable throwable = e.getCause(); - if (throwable != null) { - String message = throwable.getMessage(); - - Throwable cause = throwable.getCause(); - - if (cause != null) { - Class<?> clazz = cause.getClass(); - String className = clazz.getName(); - if (className.endsWith("UnknownHostException")) { - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR); - cambriaErrorResponse.addVariable(message); - found = true; - } - } - } - - if (false == found) { - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR); - cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR); - } - } - - return cambriaErrorResponse; - } - - /** - * write the error to the log - * - * @param cambriaErrorResponse - * @param errorMessage - * @param methodName - * @param operationDesc - */ - private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) { - - String httpCode = (cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode())); - - switch (cambriaErrorResponse.getOperationStatus()) { - case UNKNOWN_HOST_ERROR: - String hostname = cambriaErrorResponse.getVariables().get(0); - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebUnkownHostError, methodName, hostname); - BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode); - break; - case AUTHENTICATION_ERROR: - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebAuthenticationError, methodName, httpCode); - BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode); - break; - case CONNNECTION_ERROR: - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebConnectionError, methodName, httpCode); - BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode); - break; - - case INTERNAL_SERVER_ERROR: - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, methodName, operationDesc); - BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc); - break; - default: - break; - } - - } - - /** - * create a topic if it does not exists in the topicsList - * - * @param hostSet - * - list of U-EB servers - * @param apiKey - * @param secretKey - * @param topicName - * - topic to create - * @param partitionCount - * @param replicationCount - * @return - */ - public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) { - - CambriaTopicManager createTopicManager = null; - try { - - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey)); - - createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount); - - } catch (HttpException | IOException | GeneralSecurityException e) { - - logger.debug("Failed to create topic {}", topicName, e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - CambriaErrorResponse cambriaErrorResponse = processError(e); - - if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) { - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic"); - } - - return cambriaErrorResponse; - - } finally { - if (createTopicManager != null) { - createTopicManager.close(); - } - } - return new CambriaErrorResponse(CambriaOperationStatus.OK); - - } - - public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) { - CambriaTopicManager createTopicManager = null; - try { - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey)); - - if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { - createTopicManager.revokeProducer(topicName, subscriberApiKey); - } else { - createTopicManager.revokeConsumer(topicName, subscriberApiKey); - } - - } catch (HttpObjectNotFoundException | GeneralSecurityException e) { - logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage()); - BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage()); - - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND); - return cambriaErrorResponse; - - } catch (HttpException | IOException e) { - logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - CambriaErrorResponse cambriaErrorResponse = processError(e); - - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase()); - - return cambriaErrorResponse; - } finally { - if (createTopicManager != null) { - createTopicManager.close(); - } - } - - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK); - return cambriaErrorResponse; - } - - /** - * - * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER - * - * @param hostSet - * @param topicName - * @param managerApiKey - * @param managerSecretKey - * @param subscriberApiKey - * @param subscriberTypeEnum - * @return - */ - public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) { - - CambriaTopicManager createTopicManager = null; - try { - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey)); - - if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { - createTopicManager.allowProducer(topicName, subscriberApiKey); - } else { - createTopicManager.allowConsumer(topicName, subscriberApiKey); - } - - } catch (HttpObjectNotFoundException | GeneralSecurityException e) { - logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage()); - BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage()); - - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND); - return cambriaErrorResponse; - - } catch (HttpException | IOException e) { - logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - CambriaErrorResponse cambriaErrorResponse = processError(e); - - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase()); - - return cambriaErrorResponse; - } finally { - if (createTopicManager != null) { - createTopicManager.close(); - } - } - - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK); - return cambriaErrorResponse; - } - - /** - * create and retrieve a Cambria Consumer for a specific topic - * - * @param hostSet - * @param topicName - * @param apiKey - * @param secretKey - * @param consumerId - * @param consumerGroup - * @param timeoutMS - * @return - * @throws Exception - */ - public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception { - - CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(hostSet).withSocketTimeout(timeoutMS).build(); - consumer.setApiCredentials(apiKey, secretKey); - return consumer; - } - - public void closeConsumer(CambriaConsumer consumer) { - - if (consumer != null) { - consumer.close(); - } - - } - - /** - * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error) - * - * @param topicConsumer - * @return - */ - public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) { - - try { - Iterable<String> messages = topicConsumer.fetch(); - if (messages == null) { - messages = new ArrayList<String>(); - } - return Either.left(messages); - - } catch (IOException e) { - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - CambriaErrorResponse cambriaErrorResponse = processError(e); - - logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage()); - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic"); - - return Either.right(cambriaErrorResponse); - - } catch (Exception e) { - logger.debug("Failed to fetch from U-EB topic", e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); + logger.debug("After sending notification data to topic {}. result is {}", topicName, result); - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage()); - BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage()); + CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR); - return Either.right(cambriaErrorResponse); - } - } + return response; - /** - * Publish notification message to a given queue - * - * @param topicName - * @param uebPublicKey - * @param uebSecretKey - * @param uebServers - * @param data - * @return - */ - public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) { + } catch (IOException | GeneralSecurityException e) { + logger.debug("Failed to send notification {} to topic {} ", data, topicName, e); - CambriaBatchingPublisher createSimplePublisher = null; + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); - try { + CambriaErrorResponse cambriaErrorResponse = processError(e); - String json = gson.toJson(data); - logger.trace("Before sending notification data {} to topic {}", json, topicName); + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, SEND_NOTIFICATION); - createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build(); - createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); - - int result = createSimplePublisher.send(PARTITION_KEY, json); - - try { - Thread.sleep(1 * 1000); - } catch (InterruptedException e) { - logger.debug("Failed during sleep after sending the message.", e); - } - - logger.debug("After sending notification data to topic {}. result is {}", topicName, result); - - CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); - - return response; + return cambriaErrorResponse; + } finally { + if (createSimplePublisher != null) { + logger.debug("Before closing publisher"); + createSimplePublisher.close(); + logger.debug("After closing publisher"); + } + } + } - } catch (IOException | GeneralSecurityException e) { - logger.debug("Failed to send notification {} to topic {} ", data, topicName, e); + public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) { - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); + CambriaBatchingPublisher createSimplePublisher = null; - CambriaErrorResponse cambriaErrorResponse = processError(e); + CambriaErrorResponse response = null; + try { - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, SEND_NOTIFICATION); + String json = gson.toJson(data); + logger.debug("Before sending notification data {} to topic {}", json, topicName); - return cambriaErrorResponse; - } finally { - if (createSimplePublisher != null) { - logger.debug("Before closing publisher"); - createSimplePublisher.close(); - logger.debug("After closing publisher"); - } - } - } + createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build(); + createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); - private String convertListToString(List<String> list) { - StringBuilder builder = new StringBuilder(); + int result = createSimplePublisher.send(PARTITION_KEY, json); - if (list != null) { - for (int i = 0; i < list.size(); i++) { - builder.append(list.get(i)); - if (i < list.size() - 1) { - builder.append(","); - } - } - } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.debug("Failed during sleep after sending the message.", e); + } - return builder.toString(); - } + logger.debug("After sending notification data to topic {}. result is {}", topicName, result); - public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) { + } catch (IOException | GeneralSecurityException e) { + logger.debug("Failed to send notification {} to topic {} ", data, topicName, e); - CambriaBatchingPublisher createSimplePublisher = null; + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); - CambriaErrorResponse response = null; - try { + response = processError(e); - String json = gson.toJson(data); - logger.debug("Before sending notification data {} to topic {}", json, topicName); + writeErrorToLog(response, e.getMessage(), methodName, SEND_NOTIFICATION); - createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build(); - createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); + return response; - int result = createSimplePublisher.send(PARTITION_KEY, json); + } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - logger.debug("Failed during sleep after sending the message.", e); - } + logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout); + try { + List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS); + if (messagesInQ != null && false == messagesInQ.isEmpty()) { + logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size()); + response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION); + } else { + logger.debug("No message left in the queue after closing cambria publisher"); + response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); + } + } catch (IOException | InterruptedException e) { + logger.debug("Failed to close cambria publisher", e); + response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION); + } + logger.debug("After closing publisher"); - logger.debug("After sending notification data to topic {}. result is {}", topicName, result); + return response; - } catch (IOException | GeneralSecurityException e) { - logger.debug("Failed to send notification {} to topic {} ", data, topicName, e); + } - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); + public CambriaErrorResponse getApiKey(String server, String apiKey) { - response = processError(e); + CambriaErrorResponse response = null; - writeErrorToLog(response, e.getMessage(), methodName, SEND_NOTIFICATION); + List<String> hostSet = new ArrayList<>(); + hostSet.add(server); + try { + CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet)); + createIdentityManager.getApiKey(apiKey); - return response; + response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); - } + } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) { + logger.debug("Failed to fetch api key {} from server {}", apiKey, server, e); - logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout); - try { - List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS); - if (messagesInQ != null && false == messagesInQ.isEmpty()) { - logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size()); - response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION); - } else { - logger.debug("No message left in the queue after closing cambria publisher"); - response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); - } - } catch (IOException | InterruptedException e) { - logger.debug("Failed to close cambria publisher", e); - response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION); - } - logger.debug("After closing publisher"); + response = processError(e); - return response; + } - } + return response; + } - public CambriaErrorResponse getApiKey(String server, String apiKey) { + public Either<ApiCredential, CambriaErrorResponse> createUebKeys(List<String> hostSet) { + Either<ApiCredential, CambriaErrorResponse> result; - CambriaErrorResponse response = null; + try { + CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet)); - List<String> hostSet = new ArrayList<>(); - hostSet.add(server); - CambriaIdentityManager createIdentityManager = null; - try { - createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet)); - createIdentityManager.getApiKey(apiKey); - response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); + String description = String.format("ASDC Key for %s", CONSUMER_ID); + ApiCredential credential = createIdentityManager.createApiKey("", description); + createIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); + result = Either.left(credential); - } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) { - logger.debug("Failed to fetch api key {} from server ", apiKey, server, e); + } catch (Exception e) { + logger.debug("Failed to create ueb keys for servers {}",hostSet, e); - response = processError(e); + result = Either.right(processError(e)); - } + } - return response; - } + return result; + } - private static <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException { - if (useHttpsWithDmaap) { - client.usingHttps(); - } - return (T)client.build(); - } + @VisibleForTesting + <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException { + return (T)client.build(); + } } |