diff options
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java')
-rw-r--r-- | sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java | 1416 |
1 files changed, 708 insertions, 708 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java index 5d15046..eda9cf6 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,6 +28,7 @@ import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.UUID; @@ -38,7 +39,12 @@ import java.util.regex.Matcher; import org.onap.sdc.api.IDistributionClient; import org.onap.sdc.api.IDistributionStatusMessageJsonBuilder; -import org.onap.sdc.api.consumer.*; +import org.onap.sdc.api.consumer.IComponentDoneStatusMessage; +import org.onap.sdc.api.consumer.IConfiguration; +import org.onap.sdc.api.consumer.IDistributionStatusMessage; +import org.onap.sdc.api.consumer.IFinalDistrStatusMessage; +import org.onap.sdc.api.consumer.INotificationCallback; +import org.onap.sdc.api.consumer.IStatusCallback; import org.onap.sdc.api.notification.IArtifactInfo; import org.onap.sdc.api.results.IDistributionClientDownloadResult; import org.onap.sdc.api.results.IDistributionClientResult; @@ -48,7 +54,6 @@ import org.onap.sdc.utils.DistributionActionResultEnum; import org.onap.sdc.utils.DistributionClientConstants; import org.onap.sdc.utils.GeneralUtils; import org.onap.sdc.utils.Wrapper; -import org.onap.sdc.api.consumer.*; import org.onap.sdc.api.notification.IVfModuleMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,708 +77,703 @@ import fj.data.Either; public class DistributionClientImpl implements IDistributionClient { - private static Logger log = LoggerFactory.getLogger(DistributionClientImpl.class.getName()); - - protected SdcConnectorClient asdcConnector = new SdcConnectorClient(); - private ScheduledExecutorService executorPool = null; - protected CambriaIdentityManager cambriaIdentityManager = null; - private List<String> brokerServers; - protected ApiCredential credential; - protected Configuration configuration; - private INotificationCallback callback; - private IStatusCallback statusCallback; - private String notificationTopic; - private String statusTopic; - private boolean isConsumerGroupGenerated = false; - - private boolean isInitialized, isStarted, isTerminated; - - @Override - public IConfiguration getConfiguration() { - return configuration; - } - - @Override - /* see javadoc */ - public synchronized IDistributionClientResult updateConfiguration(IConfiguration conf) { - - log.info("update DistributionClient configuration"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - - IDistributionClientResult updateResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "configuration updated successfuly"); - - boolean needToUpdateCambriaConsumer = false; - - if (conf.getRelevantArtifactTypes() != null && !conf.getRelevantArtifactTypes().isEmpty()) { - configuration.setRelevantArtifactTypes(conf.getRelevantArtifactTypes()); - needToUpdateCambriaConsumer = true; - } - if (isPollingIntervalValid(conf.getPollingInterval())) { - configuration.setPollingInterval(conf.getPollingInterval()); - needToUpdateCambriaConsumer = true; - } - if (isPollingTimeoutValid(conf.getPollingTimeout())) { - configuration.setPollingTimeout(conf.getPollingTimeout()); - needToUpdateCambriaConsumer = true; - } - if (conf.getConsumerGroup() != null) { - configuration.setConsumerGroup(conf.getConsumerGroup()); - isConsumerGroupGenerated = false; - needToUpdateCambriaConsumer = true; - } else if (!isConsumerGroupGenerated) { - generateConsumerGroup(); - } - - if (needToUpdateCambriaConsumer) { - updateResult = restartConsumer(); - } - - return updateResult; - } - - @Override - /** - * Start polling the Notification topic - */ - public synchronized IDistributionClientResult start() { - - log.info("start DistributionClient"); - IDistributionClientResult startResult; - CambriaConsumer cambriaNotificationConsumer = null; - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (errorWrapper.isEmpty()) { - validateNotStarted(errorWrapper); - } - if (errorWrapper.isEmpty()) { - try { - cambriaNotificationConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(notificationTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) - .withSocketTimeout(configuration.getPollingTimeout() * 1000).build(); - - } catch (MalformedURLException | GeneralSecurityException e) { - handleCambriaInitFailure(errorWrapper, e); - } - } - if (errorWrapper.isEmpty()) { - - List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes(); - // Remove nulls from list - workaround for how configuration is built - while (relevantArtifactTypes.remove(null)); - - NotificationConsumer consumer = new NotificationConsumer(cambriaNotificationConsumer, callback, relevantArtifactTypes, this); - executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); - executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); - - handleStatusConsumer(errorWrapper, executorPool); - } - if (!errorWrapper.isEmpty()) { - startResult = errorWrapper.getInnerElement(); - } - else{ - startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client started successfuly"); - isStarted = true; - } - return startResult; - } - - private void handleStatusConsumer(Wrapper<IDistributionClientResult> errorWrapper, ScheduledExecutorService executorPool) { - if( configuration.isConsumeProduceStatusTopic()){ - CambriaConsumer cambriaStatusConsumer = null; - try { - cambriaStatusConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) - .withSocketTimeout(configuration.getPollingTimeout() * 1000).build(); - StatusConsumer statusConsumer = new StatusConsumer(cambriaStatusConsumer, statusCallback); - executorPool.scheduleAtFixedRate(statusConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); - } catch (MalformedURLException | GeneralSecurityException e) { - handleCambriaInitFailure(errorWrapper, e); - } - } - } - - @Override - /* see javadoc */ - public synchronized IDistributionClientResult stop() { - - log.info("stop DistributionClient"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - // 1. stop polling notification topic - shutdownExecutor(); - - // 2. send to ASDC unregister to topic - IDistributionClientResult unregisterResult = asdcConnector.unregisterTopics(credential); - if (unregisterResult.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { - log.info("client failed to unregister from topics"); - } else { - log.info("client unregistered from topics successfully"); - } - asdcConnector.close(); - - try { - cambriaIdentityManager.deleteCurrentApiKey(); - } catch (HttpException | IOException e) { - log.debug("failed to delete cambria keys", e); - } - cambriaIdentityManager.close(); - - isInitialized = false; - isTerminated = true; - - DistributionClientResultImpl stopResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client stopped successfuly"); - return stopResult; - } - - @Override - public IDistributionClientDownloadResult download(IArtifactInfo artifactInfo) { - log.info("DistributionClient - download"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - IDistributionClientResult result = errorWrapper.getInnerElement(); - IDistributionClientDownloadResult downloadResult = new DistributionClientDownloadResultImpl(result.getDistributionActionResult(), result.getDistributionMessageResult()); - return downloadResult; - } - return asdcConnector.dowloadArtifact(artifactInfo); - } - @Override - public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback, - IStatusCallback statusCallback) { - IDistributionClientResult initResult; - if( !conf.isConsumeProduceStatusTopic() ){ - initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'" ); - - } - else if( isNull(statusCallback) ){ - initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: statusCallback is not defined" ); - } - else{ - this.statusCallback = statusCallback; - initResult = init(conf, notificationCallback); - } - return initResult; - } - - @Override - /* - * see javadoc - */ - public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback callback) { - - log.info("DistributionClient - init"); - - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateNotInitilized(errorWrapper); - if (errorWrapper.isEmpty()) { - validateNotTerminated(errorWrapper); - } - if (errorWrapper.isEmpty()) { - validateAndInitConfiguration(errorWrapper, conf); - } - // 1. get ueb server list from configuration - if (errorWrapper.isEmpty()) { - initUebServerList(errorWrapper); - } - // 2.validate artifact types against asdc server - if (errorWrapper.isEmpty()) { - validateArtifactTypesWithAsdcServer(conf, errorWrapper); - } - // 3. create keys - if (errorWrapper.isEmpty()) { - this.callback = callback; - createUebKeys(errorWrapper); - } - // 4. register for topics - if (errorWrapper.isEmpty()) { - registerForTopics(errorWrapper); - } - - IDistributionClientResult result; - if (errorWrapper.isEmpty()) { - isInitialized = true; - result = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client initialized successfuly"); - } else { - result = errorWrapper.getInnerElement(); - } - - return result; - } - - private void registerForTopics(Wrapper<IDistributionClientResult> errorWrapper) { - Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics = asdcConnector.registerAsdcTopics(credential); - if (registerAsdcTopics.isRight()) { - - try { - cambriaIdentityManager.deleteCurrentApiKey(); - } catch (HttpException | IOException e) { - log.debug("failed to delete cambria keys", e); - } - errorWrapper.setInnerElement(registerAsdcTopics.right().value()); - } else { - TopicRegistrationResponse topics = registerAsdcTopics.left().value(); - notificationTopic = topics.getDistrNotificationTopicName(); - statusTopic = topics.getDistrStatusTopicName(); - } - - } - - private void createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) { - initCambriaClient(errorWrapper); - if (errorWrapper.isEmpty()) { - log.debug("create keys"); - DistributionClientResultImpl createKeysResponse = createUebKeys(); - if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { - errorWrapper.setInnerElement(createKeysResponse); - } - } - } - - private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) { - asdcConnector.init(configuration); - Either<List<String>, IDistributionClientResult> eitherValidArtifactTypesList = asdcConnector.getValidArtifactTypesList(); - if (eitherValidArtifactTypesList.isRight()) { - DistributionActionResultEnum errorType = eitherValidArtifactTypesList.right().value().getDistributionActionResult(); - // Support the case of a new client and older ASDC Server which does not have the API - if (errorType != DistributionActionResultEnum.ASDC_NOT_FOUND) { - errorWrapper.setInnerElement(eitherValidArtifactTypesList.right().value()); - } - } else { - final List<String> artifactTypesFromAsdc = eitherValidArtifactTypesList.left().value(); - boolean isArtifactTypesValid = artifactTypesFromAsdc.containsAll(conf.getRelevantArtifactTypes()); - if (!isArtifactTypesValid) { - List<String> invalidArtifactTypes = new ArrayList<>(); - invalidArtifactTypes.addAll(conf.getRelevantArtifactTypes()); - invalidArtifactTypes.removeAll(artifactTypesFromAsdc); - DistributionClientResultImpl errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_CONTAINS_INVALID_ARTIFACT_TYPES, - "configuration contains invalid artifact types:" + invalidArtifactTypes + " valid types are:" + artifactTypesFromAsdc); - errorWrapper.setInnerElement(errorResponse); - } else { - log.debug("Artifact types: {} were validated with ASDC server", conf.getRelevantArtifactTypes()); - } - } - } - - private void initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) { - log.debug("get ueb cluster server list from component(configuration file)"); - - Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList(); - if (serverListResponse.isRight()) { - errorWrapper.setInnerElement(serverListResponse.right().value()); - } else { - - brokerServers = serverListResponse.left().value(); - } - - } - - private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) { - if (isInitialized) { - log.warn("distribution client already initialized"); - DistributionClientResultImpl alreadyInitResponse = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, "distribution client already initialized"); - errorWrapper.setInnerElement(alreadyInitResponse); - } - } - - @Override - public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage) { - log.info("DistributionClient - sendDownloadStatus"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); - } - - private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder builder) { - DistributionClientResultImpl statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); - log.info("DistributionClient - sendStatus"); - Either<CambriaBatchingPublisher, IDistributionClientResult> eitherPublisher = getCambriaPublisher(); - if (eitherPublisher.isRight()) { - return eitherPublisher.right().value(); - } - CambriaBatchingPublisher pub = eitherPublisher.left().value(); - - log.debug("after create publisher server list " + brokerServers.toString()); - String jsonRequest = builder.build(); - - log.debug("try to send status " + jsonRequest); - - try { - pub.send("MyPartitionKey", jsonRequest); - Thread.sleep(1000L); - } catch (IOException e) { - log.debug("DistributionClient - sendDownloadStatus. Failed to send download status"); - } catch (InterruptedException e) { - log.debug("DistributionClient - sendDownloadStatus. thread was interrupted"); - } - - finally { - - try { - List<message> stuck = pub.close(10L, TimeUnit.SECONDS); - - if (!stuck.isEmpty()) { - log.debug("DistributionClient - sendDownloadStatus. " + stuck.size() + " messages unsent"); - } else { - statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "messages successfully sent"); - } - } catch (IOException | InterruptedException e) { - log.debug("DistributionClient - sendDownloadStatus. failed to send messages and close publisher "); - } - - } - return statusResult; - } - - private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher() { - CambriaBatchingPublisher cambriaPublisher = null; - try { - cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers).build(); - cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); - } catch (MalformedURLException | GeneralSecurityException e) { - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - handleCambriaInitFailure(errorWrapper, e); - return Either.right(errorWrapper.getInnerElement()); - } - return Either.left(cambriaPublisher); - } - - @Override - public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage) { - log.info("DistributionClient - sendDeploymentStatus"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); - } - - IDistributionClientResult sendNotificationStatus(long currentTimeMillis, String distributionId, ArtifactInfoImpl artifactInfo, boolean isNotified) { - log.info("DistributionClient - sendNotificationStatus"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified)); - } - - /* *************************** Private Methods *************************************************** */ - - protected DistributionClientResultImpl createUebKeys() { - DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly"); - try { - String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID()); - credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description); - cambriaIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); - - } catch (HttpException | CambriaApiException | IOException e) { - response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage()); - log.error(response.toString()); - } - return response; - } - - private IDistributionClientResult restartConsumer() { - shutdownExecutor(); - return start(); - } - - protected DistributionActionResultEnum validateAndInitConfiguration(Wrapper<IDistributionClientResult> errorWrapper, IConfiguration conf) { - DistributionActionResultEnum result = DistributionActionResultEnum.SUCCESS; - - if (conf == null) { - result = DistributionActionResultEnum.CONFIGURATION_IS_MISSING; - } else if (conf.getConsumerID() == null || conf.getConsumerID().isEmpty()) { - result = DistributionActionResultEnum.CONF_MISSING_CONSUMER_ID; - } else if (conf.getUser() == null || conf.getUser().isEmpty()) { - result = DistributionActionResultEnum.CONF_MISSING_USERNAME; - } else if (conf.getPassword() == null || conf.getPassword().isEmpty()) { - result = DistributionActionResultEnum.CONF_MISSING_PASSWORD; - } else if (conf.getMsgBusAddress() == null || conf.getMsgBusAddress().isEmpty()) { - result = DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS; - } else if (conf.getAsdcAddress() == null || conf.getAsdcAddress().isEmpty()) { - result = DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN; - } else if (!isValidFqdn(conf.getAsdcAddress())) { - result = DistributionActionResultEnum.CONF_INVALID_ASDC_FQDN; - } else if (!isValidFqdns(conf.getMsgBusAddress())){ - result = DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS; - } else if (conf.getEnvironmentName() == null || conf.getEnvironmentName().isEmpty()) { - result = DistributionActionResultEnum.CONF_MISSING_ENVIRONMENT_NAME; - } else if (conf.getRelevantArtifactTypes() == null || conf.getRelevantArtifactTypes().isEmpty()) { - result = DistributionActionResultEnum.CONF_MISSING_ARTIFACT_TYPES; - } - else if( conf.isConsumeProduceStatusTopic() && Objects.isNull(statusCallback) ){ - result = DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG; - } - // DistributionActionResultEnum.SUCCESS - else { - handleValidConf(conf); - } - - if (result != DistributionActionResultEnum.SUCCESS) { - - DistributionClientResultImpl initResult = new DistributionClientResultImpl(result, "configuration is invalid: " + result.name()); - - log.error(initResult.toString()); - errorWrapper.setInnerElement(initResult); - } - return result; - } - - private void handleValidConf(IConfiguration conf) { - this.configuration = new Configuration(conf); - if (!isPollingIntervalValid(conf.getPollingInterval())) { - configuration.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); - } - if (!isPollingTimeoutValid(conf.getPollingTimeout())) { - configuration.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC); - } - if (conf.getConsumerGroup() == null) { - generateConsumerGroup(); - } - - //Default use HTTPS with DMAAP - if (conf.isUseHttpsWithDmaap() == null){ - configuration.setUseHttpsWithDmaap(true); - } - } - - private void generateConsumerGroup() { - String generatedConsumerGroup = UUID.randomUUID().toString(); - configuration.setConsumerGroup(generatedConsumerGroup); - isConsumerGroupGenerated = true; - } - - protected boolean isValidFqdn(String fqdn) { - try { - Matcher matcher = DistributionClientConstants.FQDN_PATTERN.matcher(fqdn); - return matcher.matches(); - } catch (Exception e) { - } - return false; - } - protected boolean isValidFqdns(List<String> fqdns) { - if (fqdns != null && !fqdns.isEmpty()) { - for (String fqdn : fqdns) { - if (isValidFqdn(fqdn)) { - continue; - } else { - return false; - } - } - return true; - } - return false; - } - - private void shutdownExecutor() { - if (executorPool == null) - return; - - executorPool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!executorPool.awaitTermination(60, TimeUnit.SECONDS)) { - executorPool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!executorPool.awaitTermination(60, TimeUnit.SECONDS)) - log.error("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - executorPool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } finally { - isStarted = false; - } - } - - private void validateRunReady(Wrapper<IDistributionClientResult> errorWrapper) { - if (errorWrapper.isEmpty()) { - validateInitilized(errorWrapper); - } - if (errorWrapper.isEmpty()) { - validateNotTerminated(errorWrapper); - } - - } - - private void validateInitilized(Wrapper<IDistributionClientResult> errorWrapper) { - if (!isInitialized) { - log.debug("client was not initialized"); - IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized"); - errorWrapper.setInnerElement(result); - } - } - - private void validateNotStarted(Wrapper<IDistributionClientResult> errorWrapper) { - if (isStarted) { - log.debug("client already started"); - IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_STARTED, "distribution client already started"); - errorWrapper.setInnerElement(result); - } - } - - private void validateNotTerminated(Wrapper<IDistributionClientResult> errorWrapper) { - if (isTerminated) { - log.debug("client was terminated"); - IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_IS_TERMINATED, "distribution client was terminated"); - errorWrapper.setInnerElement(result); - } - } - - private boolean isPollingTimeoutValid(int timeout) { - boolean isValid = (timeout >= DistributionClientConstants.POLLING_TIMEOUT_SEC); - if (!isValid) { - log.warn("polling interval is out of range. value should be greater than or equals to " + DistributionClientConstants.POLLING_TIMEOUT_SEC); - log.warn("setting polling interval to default: " + DistributionClientConstants.POLLING_TIMEOUT_SEC); - } - return isValid; - } - - private boolean isPollingIntervalValid(int pollingInt) { - boolean isValid = (pollingInt >= DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); - if (!isValid) { - log.warn("polling interval is out of range. value should be greater than or equals to " + DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); - log.warn("setting polling interval to default: " + DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); - } - return isValid; - } - - private synchronized void initCambriaClient(Wrapper<IDistributionClientResult> errorWrapper) { - if (cambriaIdentityManager == null) { - try { - AbstractAuthenticatedManagerBuilder<CambriaIdentityManager> managerBuilder = new IdentityManagerBuilder().usingHosts(brokerServers); - if (configuration.isUseHttpsWithDmaap()){ - managerBuilder = managerBuilder.usingHttps(); - } - cambriaIdentityManager = managerBuilder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - handleCambriaInitFailure(errorWrapper, e); - } - } - } - - private void handleCambriaInitFailure(Wrapper<IDistributionClientResult> errorWrapper, Exception e) { - final String errorMessage = "Failed initilizing cambria component:" + e.getMessage(); - IDistributionClientResult errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CAMBRIA_INIT_FAILED, errorMessage); - errorWrapper.setInnerElement(errorResponse); - log.error(errorMessage); - log.debug(errorMessage, e); - } - - @Override - public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage, String errorReason) { - log.info("DistributionClient - sendDownloadStatus with errorReason"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); - - } - - @Override - public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage, String errorReason) { - log.info("DistributionClient - sendDeploymentStatus with errorReason"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); - - } - - @Override - public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage) { - log.info("DistributionClient - sendComponentDone status"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); - - } - - @Override - public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage, - String errorReason) { - log.info("DistributionClient - sendComponentDone status with errorReason"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); - } - - - @Override - public List<IVfModuleMetadata> decodeVfModuleArtifact(byte[] artifactPayload) { - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - String vfModuleJsonString = new String(artifactPayload, StandardCharsets.UTF_8); - final Type type = new TypeToken<List<VfModuleMetadata>>() { - }.getType(); - List<IVfModuleMetadata> vfModules = gson.fromJson(vfModuleJsonString, type); - return vfModules; - } - - - public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage) { - log.info("DistributionClient - sendFinalDistributionStatus status"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); - - } - - - @Override - public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage, - String errorReason) { - log.info("DistributionClient - sendFinalDistributionStatus status with errorReason"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); - - - } - - public Either<List<String>,IDistributionClientResult> getUEBServerList() { - List<String> msgBusAddresses = configuration.getMsgBusAddress(); - if(msgBusAddresses.isEmpty()){ - return Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS, "Message bus address was not found in the config file")); - } - else{ - return GeneralUtils.convertToValidHostName(msgBusAddresses); - } - } - - - - - - + public static final int POLLING_TIMEOUT_MULTIPLIER = 1000; + public static final long SLEEPING_THREAD_TIME = 1000L; + public static final long PUBLISHER_TIMEOUT = 10L; + public static final int TERMINATION_TIMEOUT = 60; + private static Logger log = LoggerFactory.getLogger(DistributionClientImpl.class.getName()); + + protected SdcConnectorClient asdcConnector = new SdcConnectorClient(); + private ScheduledExecutorService executorPool = null; + protected CambriaIdentityManager cambriaIdentityManager = null; + private List<String> brokerServers; + protected ApiCredential credential; + protected Configuration configuration; + private INotificationCallback callback; + private IStatusCallback statusCallback; + private String notificationTopic; + private String statusTopic; + private boolean isConsumerGroupGenerated = false; + + private boolean isInitialized, isStarted, isTerminated; + + @Override + public IConfiguration getConfiguration() { + return configuration; + } + + @Override + /* see javadoc */ + public synchronized IDistributionClientResult updateConfiguration(IConfiguration conf) { + + log.info("update DistributionClient configuration"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + + IDistributionClientResult updateResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "configuration updated successfuly"); + + boolean needToUpdateCambriaConsumer = false; + + if (conf.getRelevantArtifactTypes() != null && !conf.getRelevantArtifactTypes().isEmpty()) { + configuration.setRelevantArtifactTypes(conf.getRelevantArtifactTypes()); + needToUpdateCambriaConsumer = true; + } + if (isPollingIntervalValid(conf.getPollingInterval())) { + configuration.setPollingInterval(conf.getPollingInterval()); + needToUpdateCambriaConsumer = true; + } + if (isPollingTimeoutValid(conf.getPollingTimeout())) { + configuration.setPollingTimeout(conf.getPollingTimeout()); + needToUpdateCambriaConsumer = true; + } + if (conf.getConsumerGroup() != null) { + configuration.setConsumerGroup(conf.getConsumerGroup()); + isConsumerGroupGenerated = false; + needToUpdateCambriaConsumer = true; + } else if (!isConsumerGroupGenerated) { + generateConsumerGroup(); + } + + if (needToUpdateCambriaConsumer) { + updateResult = restartConsumer(); + } + + return updateResult; + } + + @Override + /** + * Start polling the Notification topic + */ + public synchronized IDistributionClientResult start() { + + log.info("start DistributionClient"); + IDistributionClientResult startResult; + CambriaConsumer cambriaNotificationConsumer = null; + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (errorWrapper.isEmpty()) { + validateNotStarted(errorWrapper); + } + if (errorWrapper.isEmpty()) { + try { + cambriaNotificationConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(notificationTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) + .withSocketTimeout(configuration.getPollingTimeout() * POLLING_TIMEOUT_MULTIPLIER).build(); + + } catch (MalformedURLException | GeneralSecurityException e) { + handleCambriaInitFailure(errorWrapper, e); + } + } + if (errorWrapper.isEmpty()) { + + List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes(); + // Remove nulls from list - workaround for how configuration is built + relevantArtifactTypes.removeAll(Collections.singleton(null)); + + NotificationConsumer consumer = new NotificationConsumer(cambriaNotificationConsumer, callback, relevantArtifactTypes, this); + executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); + executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); + + handleStatusConsumer(errorWrapper, executorPool); + } + if (!errorWrapper.isEmpty()) { + startResult = errorWrapper.getInnerElement(); + } else { + startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client started successfuly"); + isStarted = true; + } + return startResult; + } + + private void handleStatusConsumer(Wrapper<IDistributionClientResult> errorWrapper, ScheduledExecutorService executorPool) { + if (configuration.isConsumeProduceStatusTopic()) { + CambriaConsumer cambriaStatusConsumer; + try { + cambriaStatusConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) + .withSocketTimeout(configuration.getPollingTimeout() * POLLING_TIMEOUT_MULTIPLIER).build(); + StatusConsumer statusConsumer = new StatusConsumer(cambriaStatusConsumer, statusCallback); + executorPool.scheduleAtFixedRate(statusConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); + } catch (MalformedURLException | GeneralSecurityException e) { + handleCambriaInitFailure(errorWrapper, e); + } + } + } + + @Override + /* see javadoc */ + public synchronized IDistributionClientResult stop() { + + log.info("stop DistributionClient"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + // 1. stop polling notification topic + shutdownExecutor(); + + // 2. send to ASDC unregister to topic + IDistributionClientResult unregisterResult = asdcConnector.unregisterTopics(credential); + if (unregisterResult.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { + log.info("client failed to unregister from topics"); + } else { + log.info("client unregistered from topics successfully"); + } + asdcConnector.close(); + + try { + cambriaIdentityManager.deleteCurrentApiKey(); + } catch (HttpException | IOException e) { + log.debug("failed to delete cambria keys", e); + } + cambriaIdentityManager.close(); + + isInitialized = false; + isTerminated = true; + + DistributionClientResultImpl stopResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client stopped successfuly"); + return stopResult; + } + + @Override + public IDistributionClientDownloadResult download(IArtifactInfo artifactInfo) { + log.info("DistributionClient - download"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + IDistributionClientResult result = errorWrapper.getInnerElement(); + IDistributionClientDownloadResult downloadResult = new DistributionClientDownloadResultImpl(result.getDistributionActionResult(), result.getDistributionMessageResult()); + return downloadResult; + } + return asdcConnector.dowloadArtifact(artifactInfo); + } + + @Override + public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback, + IStatusCallback statusCallback) { + IDistributionClientResult initResult; + if (!conf.isConsumeProduceStatusTopic()) { + initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'"); + + } else if (isNull(statusCallback)) { + initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: statusCallback is not defined"); + } else { + this.statusCallback = statusCallback; + initResult = init(conf, notificationCallback); + } + return initResult; + } + + @Override + /* + * see javadoc + */ + public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback callback) { + + log.info("DistributionClient - init"); + + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateNotInitilized(errorWrapper); + if (errorWrapper.isEmpty()) { + validateNotTerminated(errorWrapper); + } + if (errorWrapper.isEmpty()) { + validateAndInitConfiguration(errorWrapper, conf); + } + // 1. get ueb server list from configuration + if (errorWrapper.isEmpty()) { + initUebServerList(errorWrapper); + } + // 2.validate artifact types against asdc server + if (errorWrapper.isEmpty()) { + validateArtifactTypesWithAsdcServer(conf, errorWrapper); + } + // 3. create keys + if (errorWrapper.isEmpty()) { + this.callback = callback; + createUebKeys(errorWrapper); + } + // 4. register for topics + if (errorWrapper.isEmpty()) { + registerForTopics(errorWrapper); + } + + IDistributionClientResult result; + if (errorWrapper.isEmpty()) { + isInitialized = true; + result = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client initialized successfuly"); + } else { + result = errorWrapper.getInnerElement(); + } + + return result; + } + + private void registerForTopics(Wrapper<IDistributionClientResult> errorWrapper) { + Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics = asdcConnector.registerAsdcTopics(credential); + if (registerAsdcTopics.isRight()) { + + try { + cambriaIdentityManager.deleteCurrentApiKey(); + } catch (HttpException | IOException e) { + log.debug("failed to delete cambria keys", e); + } + errorWrapper.setInnerElement(registerAsdcTopics.right().value()); + } else { + TopicRegistrationResponse topics = registerAsdcTopics.left().value(); + notificationTopic = topics.getDistrNotificationTopicName(); + statusTopic = topics.getDistrStatusTopicName(); + } + + } + + private void createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) { + initCambriaClient(errorWrapper); + if (errorWrapper.isEmpty()) { + log.debug("create keys"); + DistributionClientResultImpl createKeysResponse = createUebKeys(); + if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { + errorWrapper.setInnerElement(createKeysResponse); + } + } + } + + private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) { + asdcConnector.init(configuration); + Either<List<String>, IDistributionClientResult> eitherValidArtifactTypesList = asdcConnector.getValidArtifactTypesList(); + if (eitherValidArtifactTypesList.isRight()) { + DistributionActionResultEnum errorType = eitherValidArtifactTypesList.right().value().getDistributionActionResult(); + // Support the case of a new client and older ASDC Server which does not have the API + if (errorType != DistributionActionResultEnum.ASDC_NOT_FOUND) { + errorWrapper.setInnerElement(eitherValidArtifactTypesList.right().value()); + } + } else { + final List<String> artifactTypesFromAsdc = eitherValidArtifactTypesList.left().value(); + boolean isArtifactTypesValid = artifactTypesFromAsdc.containsAll(conf.getRelevantArtifactTypes()); + if (!isArtifactTypesValid) { + List<String> invalidArtifactTypes = new ArrayList<>(); + invalidArtifactTypes.addAll(conf.getRelevantArtifactTypes()); + invalidArtifactTypes.removeAll(artifactTypesFromAsdc); + DistributionClientResultImpl errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_CONTAINS_INVALID_ARTIFACT_TYPES, + "configuration contains invalid artifact types:" + invalidArtifactTypes + " valid types are:" + artifactTypesFromAsdc); + errorWrapper.setInnerElement(errorResponse); + } else { + log.debug("Artifact types: {} were validated with ASDC server", conf.getRelevantArtifactTypes()); + } + } + } + + private void initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) { + log.debug("get ueb cluster server list from component(configuration file)"); + + Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList(); + if (serverListResponse.isRight()) { + errorWrapper.setInnerElement(serverListResponse.right().value()); + } else { + + brokerServers = serverListResponse.left().value(); + } + + } + + private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) { + if (isInitialized) { + log.warn("distribution client already initialized"); + DistributionClientResultImpl alreadyInitResponse = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, "distribution client already initialized"); + errorWrapper.setInnerElement(alreadyInitResponse); + } + } + + @Override + public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage) { + log.info("DistributionClient - sendDownloadStatus"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + } + + private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder builder) { + DistributionClientResultImpl statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); + log.info("DistributionClient - sendStatus"); + Either<CambriaBatchingPublisher, IDistributionClientResult> eitherPublisher = getCambriaPublisher(); + if (eitherPublisher.isRight()) { + return eitherPublisher.right().value(); + } + CambriaBatchingPublisher pub = eitherPublisher.left().value(); + + log.debug("after create publisher server list " + brokerServers.toString()); + String jsonRequest = builder.build(); + + log.debug("try to send status " + jsonRequest); + + try { + pub.send("MyPartitionKey", jsonRequest); + Thread.sleep(SLEEPING_THREAD_TIME); + } catch (IOException e) { + log.debug("DistributionClient - sendDownloadStatus. Failed to send download status"); + } catch (InterruptedException e) { + log.debug("DistributionClient - sendDownloadStatus. thread was interrupted"); + } finally { + + try { + List<message> stuck = pub.close(PUBLISHER_TIMEOUT, TimeUnit.SECONDS); + + if (!stuck.isEmpty()) { + log.debug("DistributionClient - sendDownloadStatus. " + stuck.size() + " messages unsent"); + } else { + statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "messages successfully sent"); + } + } catch (IOException | InterruptedException e) { + log.debug("DistributionClient - sendDownloadStatus. failed to send messages and close publisher "); + } + + } + return statusResult; + } + + private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher() { + CambriaBatchingPublisher cambriaPublisher = null; + try { + cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers).build(); + cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); + } catch (MalformedURLException | GeneralSecurityException e) { + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + handleCambriaInitFailure(errorWrapper, e); + return Either.right(errorWrapper.getInnerElement()); + } + return Either.left(cambriaPublisher); + } + + @Override + public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage) { + log.info("DistributionClient - sendDeploymentStatus"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + } + + IDistributionClientResult sendNotificationStatus(long currentTimeMillis, String distributionId, ArtifactInfoImpl artifactInfo, boolean isNotified) { + log.info("DistributionClient - sendNotificationStatus"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified)); + } + + /* *************************** Private Methods *************************************************** */ + + protected DistributionClientResultImpl createUebKeys() { + DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly"); + try { + String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID()); + credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description); + cambriaIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); + + } catch (HttpException | CambriaApiException | IOException e) { + response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage()); + log.error(response.toString()); + } + return response; + } + + private IDistributionClientResult restartConsumer() { + shutdownExecutor(); + return start(); + } + + protected DistributionActionResultEnum validateAndInitConfiguration(Wrapper<IDistributionClientResult> errorWrapper, IConfiguration conf) { + DistributionActionResultEnum result = DistributionActionResultEnum.SUCCESS; + + if (conf == null) { + result = DistributionActionResultEnum.CONFIGURATION_IS_MISSING; + } else if (conf.getConsumerID() == null || conf.getConsumerID().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_CONSUMER_ID; + } else if (conf.getUser() == null || conf.getUser().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_USERNAME; + } else if (conf.getPassword() == null || conf.getPassword().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_PASSWORD; + } else if (conf.getMsgBusAddress() == null || conf.getMsgBusAddress().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS; + } else if (conf.getAsdcAddress() == null || conf.getAsdcAddress().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN; + } else if (!isValidFqdn(conf.getAsdcAddress())) { + result = DistributionActionResultEnum.CONF_INVALID_ASDC_FQDN; + } else if (!isValidFqdns(conf.getMsgBusAddress())) { + result = DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS; + } else if (conf.getEnvironmentName() == null || conf.getEnvironmentName().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_ENVIRONMENT_NAME; + } else if (conf.getRelevantArtifactTypes() == null || conf.getRelevantArtifactTypes().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_ARTIFACT_TYPES; + } else if (conf.isConsumeProduceStatusTopic() && Objects.isNull(statusCallback)) { + result = DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG; + } else { // DistributionActionResultEnum.SUCCESS + handleValidConf(conf); + } + + if (result != DistributionActionResultEnum.SUCCESS) { + + DistributionClientResultImpl initResult = new DistributionClientResultImpl(result, "configuration is invalid: " + result.name()); + + log.error(initResult.toString()); + errorWrapper.setInnerElement(initResult); + } + return result; + } + + private void handleValidConf(IConfiguration conf) { + this.configuration = new Configuration(conf); + if (!isPollingIntervalValid(conf.getPollingInterval())) { + configuration.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); + } + if (!isPollingTimeoutValid(conf.getPollingTimeout())) { + configuration.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC); + } + if (conf.getConsumerGroup() == null) { + generateConsumerGroup(); + } + + //Default use HTTPS with DMAAP + if (conf.isUseHttpsWithDmaap() == null) { + configuration.setUseHttpsWithDmaap(true); + } + } + + private void generateConsumerGroup() { + String generatedConsumerGroup = UUID.randomUUID().toString(); + configuration.setConsumerGroup(generatedConsumerGroup); + isConsumerGroupGenerated = true; + } + + protected boolean isValidFqdn(String fqdn) { + try { + Matcher matcher = DistributionClientConstants.FQDN_PATTERN.matcher(fqdn); + return matcher.matches(); + } catch (Exception e) { + } + return false; + } + + protected boolean isValidFqdns(List<String> fqdns) { + if (fqdns != null && !fqdns.isEmpty()) { + for (String fqdn : fqdns) { + if (isValidFqdn(fqdn)) { + continue; + } else { + return false; + } + } + return true; + } + return false; + } + + private void shutdownExecutor() { + if (executorPool == null) { + return; + } + + executorPool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!executorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS)) { + executorPool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!executorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS)) { + log.error("Pool did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + executorPool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } finally { + isStarted = false; + } + } + + private void validateRunReady(Wrapper<IDistributionClientResult> errorWrapper) { + if (errorWrapper.isEmpty()) { + validateInitilized(errorWrapper); + } + if (errorWrapper.isEmpty()) { + validateNotTerminated(errorWrapper); + } + + } + + private void validateInitilized(Wrapper<IDistributionClientResult> errorWrapper) { + if (!isInitialized) { + log.debug("client was not initialized"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized"); + errorWrapper.setInnerElement(result); + } + } + + private void validateNotStarted(Wrapper<IDistributionClientResult> errorWrapper) { + if (isStarted) { + log.debug("client already started"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_STARTED, "distribution client already started"); + errorWrapper.setInnerElement(result); + } + } + + private void validateNotTerminated(Wrapper<IDistributionClientResult> errorWrapper) { + if (isTerminated) { + log.debug("client was terminated"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_IS_TERMINATED, "distribution client was terminated"); + errorWrapper.setInnerElement(result); + } + } + + private boolean isPollingTimeoutValid(int timeout) { + boolean isValid = (timeout >= DistributionClientConstants.POLLING_TIMEOUT_SEC); + if (!isValid) { + log.warn("polling interval is out of range. value should be greater than or equals to " + DistributionClientConstants.POLLING_TIMEOUT_SEC); + log.warn("setting polling interval to default: " + DistributionClientConstants.POLLING_TIMEOUT_SEC); + } + return isValid; + } + + private boolean isPollingIntervalValid(int pollingInt) { + boolean isValid = (pollingInt >= DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); + if (!isValid) { + log.warn("polling interval is out of range. value should be greater than or equals to " + DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); + log.warn("setting polling interval to default: " + DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); + } + return isValid; + } + + private synchronized void initCambriaClient(Wrapper<IDistributionClientResult> errorWrapper) { + if (cambriaIdentityManager == null) { + try { + AbstractAuthenticatedManagerBuilder<CambriaIdentityManager> managerBuilder = new IdentityManagerBuilder().usingHosts(brokerServers); + if (configuration.isUseHttpsWithDmaap()) { + managerBuilder = managerBuilder.usingHttps(); + } + cambriaIdentityManager = managerBuilder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + handleCambriaInitFailure(errorWrapper, e); + } + } + } + + private void handleCambriaInitFailure(Wrapper<IDistributionClientResult> errorWrapper, Exception e) { + final String errorMessage = "Failed initilizing cambria component:" + e.getMessage(); + IDistributionClientResult errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CAMBRIA_INIT_FAILED, errorMessage); + errorWrapper.setInnerElement(errorResponse); + log.error(errorMessage); + log.debug(errorMessage, e); + } + + @Override + public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage, String errorReason) { + log.info("DistributionClient - sendDownloadStatus with errorReason"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + + } + + @Override + public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage, String errorReason) { + log.info("DistributionClient - sendDeploymentStatus with errorReason"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + + } + + @Override + public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage) { + log.info("DistributionClient - sendComponentDone status"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + + } + + @Override + public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage, + String errorReason) { + log.info("DistributionClient - sendComponentDone status with errorReason"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + } + + + @Override + public List<IVfModuleMetadata> decodeVfModuleArtifact(byte[] artifactPayload) { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + String vfModuleJsonString = new String(artifactPayload, StandardCharsets.UTF_8); + final Type type = new TypeToken<List<VfModuleMetadata>>() { + }.getType(); + List<IVfModuleMetadata> vfModules = gson.fromJson(vfModuleJsonString, type); + return vfModules; + } + + + public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage) { + log.info("DistributionClient - sendFinalDistributionStatus status"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + + } + + + @Override + public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage, + String errorReason) { + log.info("DistributionClient - sendFinalDistributionStatus status with errorReason"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + + + } + + public Either<List<String>, IDistributionClientResult> getUEBServerList() { + List<String> msgBusAddresses = configuration.getMsgBusAddress(); + if (msgBusAddresses.isEmpty()) { + return Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS, "Message bus address was not found in the config file")); + } else { + return GeneralUtils.convertToValidHostName(msgBusAddresses); + } + } + + } |