diff options
author | Yuli Shlosberg <ys9693@att.com> | 2018-03-07 16:29:57 +0200 |
---|---|---|
committer | Yuli Shlosberg <ys9693@att.com> | 2018-03-08 14:23:18 +0200 |
commit | e8d8a37da95c6fea435e0b3e93a477b5aa45b9b1 (patch) | |
tree | 415c9e5643f051e792ee414d887f8fe996a2b372 /sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java | |
parent | 2f2d71255620b40e6021a54fe514783ebc2d5260 (diff) |
update distribution-client package namesv1.3.02.0.0-ONAPbeijing2.0.0-ONAP
Change-Id: Ic6f81bc8fdd3b021033c7c68e44f876a6ee1d21a
Issue-ID: SDC-952
Signed-off-by: Yuli Shlosberg <ys9693@att.com>
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java')
-rw-r--r-- | sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java | 779 |
1 files changed, 779 insertions, 0 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java new file mode 100644 index 0000000..5d15046 --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java @@ -0,0 +1,779 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.impl; + +import static java.util.Objects.isNull; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.net.MalformedURLException; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; + +import org.onap.sdc.api.IDistributionClient; +import org.onap.sdc.api.IDistributionStatusMessageJsonBuilder; +import org.onap.sdc.api.consumer.*; +import org.onap.sdc.api.notification.IArtifactInfo; +import org.onap.sdc.api.results.IDistributionClientDownloadResult; +import org.onap.sdc.api.results.IDistributionClientResult; +import org.onap.sdc.http.SdcConnectorClient; +import org.onap.sdc.http.TopicRegistrationResponse; +import org.onap.sdc.utils.DistributionActionResultEnum; +import org.onap.sdc.utils.DistributionClientConstants; +import org.onap.sdc.utils.GeneralUtils; +import org.onap.sdc.utils.Wrapper; +import org.onap.sdc.api.consumer.*; +import org.onap.sdc.api.notification.IVfModuleMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.apiClient.credentials.ApiCredential; +import com.att.nsa.apiClient.http.HttpException; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClient.CambriaApiException; +import com.att.nsa.cambria.client.CambriaClientBuilders.AbstractAuthenticatedManagerBuilder; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import com.att.nsa.cambria.client.CambriaConsumer; +import com.att.nsa.cambria.client.CambriaIdentityManager; +import com.att.nsa.cambria.client.CambriaPublisher.message; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; + +import fj.data.Either; + +public class DistributionClientImpl implements IDistributionClient { + + private static Logger log = LoggerFactory.getLogger(DistributionClientImpl.class.getName()); + + protected SdcConnectorClient asdcConnector = new SdcConnectorClient(); + private ScheduledExecutorService executorPool = null; + protected CambriaIdentityManager cambriaIdentityManager = null; + private List<String> brokerServers; + protected ApiCredential credential; + protected Configuration configuration; + private INotificationCallback callback; + private IStatusCallback statusCallback; + private String notificationTopic; + private String statusTopic; + private boolean isConsumerGroupGenerated = false; + + private boolean isInitialized, isStarted, isTerminated; + + @Override + public IConfiguration getConfiguration() { + return configuration; + } + + @Override + /* see javadoc */ + public synchronized IDistributionClientResult updateConfiguration(IConfiguration conf) { + + log.info("update DistributionClient configuration"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + + IDistributionClientResult updateResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "configuration updated successfuly"); + + boolean needToUpdateCambriaConsumer = false; + + if (conf.getRelevantArtifactTypes() != null && !conf.getRelevantArtifactTypes().isEmpty()) { + configuration.setRelevantArtifactTypes(conf.getRelevantArtifactTypes()); + needToUpdateCambriaConsumer = true; + } + if (isPollingIntervalValid(conf.getPollingInterval())) { + configuration.setPollingInterval(conf.getPollingInterval()); + needToUpdateCambriaConsumer = true; + } + if (isPollingTimeoutValid(conf.getPollingTimeout())) { + configuration.setPollingTimeout(conf.getPollingTimeout()); + needToUpdateCambriaConsumer = true; + } + if (conf.getConsumerGroup() != null) { + configuration.setConsumerGroup(conf.getConsumerGroup()); + isConsumerGroupGenerated = false; + needToUpdateCambriaConsumer = true; + } else if (!isConsumerGroupGenerated) { + generateConsumerGroup(); + } + + if (needToUpdateCambriaConsumer) { + updateResult = restartConsumer(); + } + + return updateResult; + } + + @Override + /** + * Start polling the Notification topic + */ + public synchronized IDistributionClientResult start() { + + log.info("start DistributionClient"); + IDistributionClientResult startResult; + CambriaConsumer cambriaNotificationConsumer = null; + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (errorWrapper.isEmpty()) { + validateNotStarted(errorWrapper); + } + if (errorWrapper.isEmpty()) { + try { + cambriaNotificationConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(notificationTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) + .withSocketTimeout(configuration.getPollingTimeout() * 1000).build(); + + } catch (MalformedURLException | GeneralSecurityException e) { + handleCambriaInitFailure(errorWrapper, e); + } + } + if (errorWrapper.isEmpty()) { + + List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes(); + // Remove nulls from list - workaround for how configuration is built + while (relevantArtifactTypes.remove(null)); + + NotificationConsumer consumer = new NotificationConsumer(cambriaNotificationConsumer, callback, relevantArtifactTypes, this); + executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); + executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); + + handleStatusConsumer(errorWrapper, executorPool); + } + if (!errorWrapper.isEmpty()) { + startResult = errorWrapper.getInnerElement(); + } + else{ + startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client started successfuly"); + isStarted = true; + } + return startResult; + } + + private void handleStatusConsumer(Wrapper<IDistributionClientResult> errorWrapper, ScheduledExecutorService executorPool) { + if( configuration.isConsumeProduceStatusTopic()){ + CambriaConsumer cambriaStatusConsumer = null; + try { + cambriaStatusConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) + .withSocketTimeout(configuration.getPollingTimeout() * 1000).build(); + StatusConsumer statusConsumer = new StatusConsumer(cambriaStatusConsumer, statusCallback); + executorPool.scheduleAtFixedRate(statusConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); + } catch (MalformedURLException | GeneralSecurityException e) { + handleCambriaInitFailure(errorWrapper, e); + } + } + } + + @Override + /* see javadoc */ + public synchronized IDistributionClientResult stop() { + + log.info("stop DistributionClient"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + // 1. stop polling notification topic + shutdownExecutor(); + + // 2. send to ASDC unregister to topic + IDistributionClientResult unregisterResult = asdcConnector.unregisterTopics(credential); + if (unregisterResult.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { + log.info("client failed to unregister from topics"); + } else { + log.info("client unregistered from topics successfully"); + } + asdcConnector.close(); + + try { + cambriaIdentityManager.deleteCurrentApiKey(); + } catch (HttpException | IOException e) { + log.debug("failed to delete cambria keys", e); + } + cambriaIdentityManager.close(); + + isInitialized = false; + isTerminated = true; + + DistributionClientResultImpl stopResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client stopped successfuly"); + return stopResult; + } + + @Override + public IDistributionClientDownloadResult download(IArtifactInfo artifactInfo) { + log.info("DistributionClient - download"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + IDistributionClientResult result = errorWrapper.getInnerElement(); + IDistributionClientDownloadResult downloadResult = new DistributionClientDownloadResultImpl(result.getDistributionActionResult(), result.getDistributionMessageResult()); + return downloadResult; + } + return asdcConnector.dowloadArtifact(artifactInfo); + } + @Override + public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback, + IStatusCallback statusCallback) { + IDistributionClientResult initResult; + if( !conf.isConsumeProduceStatusTopic() ){ + initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'" ); + + } + else if( isNull(statusCallback) ){ + initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: statusCallback is not defined" ); + } + else{ + this.statusCallback = statusCallback; + initResult = init(conf, notificationCallback); + } + return initResult; + } + + @Override + /* + * see javadoc + */ + public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback callback) { + + log.info("DistributionClient - init"); + + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateNotInitilized(errorWrapper); + if (errorWrapper.isEmpty()) { + validateNotTerminated(errorWrapper); + } + if (errorWrapper.isEmpty()) { + validateAndInitConfiguration(errorWrapper, conf); + } + // 1. get ueb server list from configuration + if (errorWrapper.isEmpty()) { + initUebServerList(errorWrapper); + } + // 2.validate artifact types against asdc server + if (errorWrapper.isEmpty()) { + validateArtifactTypesWithAsdcServer(conf, errorWrapper); + } + // 3. create keys + if (errorWrapper.isEmpty()) { + this.callback = callback; + createUebKeys(errorWrapper); + } + // 4. register for topics + if (errorWrapper.isEmpty()) { + registerForTopics(errorWrapper); + } + + IDistributionClientResult result; + if (errorWrapper.isEmpty()) { + isInitialized = true; + result = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client initialized successfuly"); + } else { + result = errorWrapper.getInnerElement(); + } + + return result; + } + + private void registerForTopics(Wrapper<IDistributionClientResult> errorWrapper) { + Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics = asdcConnector.registerAsdcTopics(credential); + if (registerAsdcTopics.isRight()) { + + try { + cambriaIdentityManager.deleteCurrentApiKey(); + } catch (HttpException | IOException e) { + log.debug("failed to delete cambria keys", e); + } + errorWrapper.setInnerElement(registerAsdcTopics.right().value()); + } else { + TopicRegistrationResponse topics = registerAsdcTopics.left().value(); + notificationTopic = topics.getDistrNotificationTopicName(); + statusTopic = topics.getDistrStatusTopicName(); + } + + } + + private void createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) { + initCambriaClient(errorWrapper); + if (errorWrapper.isEmpty()) { + log.debug("create keys"); + DistributionClientResultImpl createKeysResponse = createUebKeys(); + if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { + errorWrapper.setInnerElement(createKeysResponse); + } + } + } + + private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) { + asdcConnector.init(configuration); + Either<List<String>, IDistributionClientResult> eitherValidArtifactTypesList = asdcConnector.getValidArtifactTypesList(); + if (eitherValidArtifactTypesList.isRight()) { + DistributionActionResultEnum errorType = eitherValidArtifactTypesList.right().value().getDistributionActionResult(); + // Support the case of a new client and older ASDC Server which does not have the API + if (errorType != DistributionActionResultEnum.ASDC_NOT_FOUND) { + errorWrapper.setInnerElement(eitherValidArtifactTypesList.right().value()); + } + } else { + final List<String> artifactTypesFromAsdc = eitherValidArtifactTypesList.left().value(); + boolean isArtifactTypesValid = artifactTypesFromAsdc.containsAll(conf.getRelevantArtifactTypes()); + if (!isArtifactTypesValid) { + List<String> invalidArtifactTypes = new ArrayList<>(); + invalidArtifactTypes.addAll(conf.getRelevantArtifactTypes()); + invalidArtifactTypes.removeAll(artifactTypesFromAsdc); + DistributionClientResultImpl errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_CONTAINS_INVALID_ARTIFACT_TYPES, + "configuration contains invalid artifact types:" + invalidArtifactTypes + " valid types are:" + artifactTypesFromAsdc); + errorWrapper.setInnerElement(errorResponse); + } else { + log.debug("Artifact types: {} were validated with ASDC server", conf.getRelevantArtifactTypes()); + } + } + } + + private void initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) { + log.debug("get ueb cluster server list from component(configuration file)"); + + Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList(); + if (serverListResponse.isRight()) { + errorWrapper.setInnerElement(serverListResponse.right().value()); + } else { + + brokerServers = serverListResponse.left().value(); + } + + } + + private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) { + if (isInitialized) { + log.warn("distribution client already initialized"); + DistributionClientResultImpl alreadyInitResponse = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, "distribution client already initialized"); + errorWrapper.setInnerElement(alreadyInitResponse); + } + } + + @Override + public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage) { + log.info("DistributionClient - sendDownloadStatus"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + } + + private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder builder) { + DistributionClientResultImpl statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); + log.info("DistributionClient - sendStatus"); + Either<CambriaBatchingPublisher, IDistributionClientResult> eitherPublisher = getCambriaPublisher(); + if (eitherPublisher.isRight()) { + return eitherPublisher.right().value(); + } + CambriaBatchingPublisher pub = eitherPublisher.left().value(); + + log.debug("after create publisher server list " + brokerServers.toString()); + String jsonRequest = builder.build(); + + log.debug("try to send status " + jsonRequest); + + try { + pub.send("MyPartitionKey", jsonRequest); + Thread.sleep(1000L); + } catch (IOException e) { + log.debug("DistributionClient - sendDownloadStatus. Failed to send download status"); + } catch (InterruptedException e) { + log.debug("DistributionClient - sendDownloadStatus. thread was interrupted"); + } + + finally { + + try { + List<message> stuck = pub.close(10L, TimeUnit.SECONDS); + + if (!stuck.isEmpty()) { + log.debug("DistributionClient - sendDownloadStatus. " + stuck.size() + " messages unsent"); + } else { + statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "messages successfully sent"); + } + } catch (IOException | InterruptedException e) { + log.debug("DistributionClient - sendDownloadStatus. failed to send messages and close publisher "); + } + + } + return statusResult; + } + + private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher() { + CambriaBatchingPublisher cambriaPublisher = null; + try { + cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers).build(); + cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); + } catch (MalformedURLException | GeneralSecurityException e) { + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + handleCambriaInitFailure(errorWrapper, e); + return Either.right(errorWrapper.getInnerElement()); + } + return Either.left(cambriaPublisher); + } + + @Override + public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage) { + log.info("DistributionClient - sendDeploymentStatus"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + } + + IDistributionClientResult sendNotificationStatus(long currentTimeMillis, String distributionId, ArtifactInfoImpl artifactInfo, boolean isNotified) { + log.info("DistributionClient - sendNotificationStatus"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified)); + } + + /* *************************** Private Methods *************************************************** */ + + protected DistributionClientResultImpl createUebKeys() { + DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly"); + try { + String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID()); + credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description); + cambriaIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); + + } catch (HttpException | CambriaApiException | IOException e) { + response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage()); + log.error(response.toString()); + } + return response; + } + + private IDistributionClientResult restartConsumer() { + shutdownExecutor(); + return start(); + } + + protected DistributionActionResultEnum validateAndInitConfiguration(Wrapper<IDistributionClientResult> errorWrapper, IConfiguration conf) { + DistributionActionResultEnum result = DistributionActionResultEnum.SUCCESS; + + if (conf == null) { + result = DistributionActionResultEnum.CONFIGURATION_IS_MISSING; + } else if (conf.getConsumerID() == null || conf.getConsumerID().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_CONSUMER_ID; + } else if (conf.getUser() == null || conf.getUser().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_USERNAME; + } else if (conf.getPassword() == null || conf.getPassword().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_PASSWORD; + } else if (conf.getMsgBusAddress() == null || conf.getMsgBusAddress().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS; + } else if (conf.getAsdcAddress() == null || conf.getAsdcAddress().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN; + } else if (!isValidFqdn(conf.getAsdcAddress())) { + result = DistributionActionResultEnum.CONF_INVALID_ASDC_FQDN; + } else if (!isValidFqdns(conf.getMsgBusAddress())){ + result = DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS; + } else if (conf.getEnvironmentName() == null || conf.getEnvironmentName().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_ENVIRONMENT_NAME; + } else if (conf.getRelevantArtifactTypes() == null || conf.getRelevantArtifactTypes().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_ARTIFACT_TYPES; + } + else if( conf.isConsumeProduceStatusTopic() && Objects.isNull(statusCallback) ){ + result = DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG; + } + // DistributionActionResultEnum.SUCCESS + else { + handleValidConf(conf); + } + + if (result != DistributionActionResultEnum.SUCCESS) { + + DistributionClientResultImpl initResult = new DistributionClientResultImpl(result, "configuration is invalid: " + result.name()); + + log.error(initResult.toString()); + errorWrapper.setInnerElement(initResult); + } + return result; + } + + private void handleValidConf(IConfiguration conf) { + this.configuration = new Configuration(conf); + if (!isPollingIntervalValid(conf.getPollingInterval())) { + configuration.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); + } + if (!isPollingTimeoutValid(conf.getPollingTimeout())) { + configuration.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC); + } + if (conf.getConsumerGroup() == null) { + generateConsumerGroup(); + } + + //Default use HTTPS with DMAAP + if (conf.isUseHttpsWithDmaap() == null){ + configuration.setUseHttpsWithDmaap(true); + } + } + + private void generateConsumerGroup() { + String generatedConsumerGroup = UUID.randomUUID().toString(); + configuration.setConsumerGroup(generatedConsumerGroup); + isConsumerGroupGenerated = true; + } + + protected boolean isValidFqdn(String fqdn) { + try { + Matcher matcher = DistributionClientConstants.FQDN_PATTERN.matcher(fqdn); + return matcher.matches(); + } catch (Exception e) { + } + return false; + } + protected boolean isValidFqdns(List<String> fqdns) { + if (fqdns != null && !fqdns.isEmpty()) { + for (String fqdn : fqdns) { + if (isValidFqdn(fqdn)) { + continue; + } else { + return false; + } + } + return true; + } + return false; + } + + private void shutdownExecutor() { + if (executorPool == null) + return; + + executorPool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!executorPool.awaitTermination(60, TimeUnit.SECONDS)) { + executorPool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!executorPool.awaitTermination(60, TimeUnit.SECONDS)) + log.error("Pool did not terminate"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + executorPool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } finally { + isStarted = false; + } + } + + private void validateRunReady(Wrapper<IDistributionClientResult> errorWrapper) { + if (errorWrapper.isEmpty()) { + validateInitilized(errorWrapper); + } + if (errorWrapper.isEmpty()) { + validateNotTerminated(errorWrapper); + } + + } + + private void validateInitilized(Wrapper<IDistributionClientResult> errorWrapper) { + if (!isInitialized) { + log.debug("client was not initialized"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized"); + errorWrapper.setInnerElement(result); + } + } + + private void validateNotStarted(Wrapper<IDistributionClientResult> errorWrapper) { + if (isStarted) { + log.debug("client already started"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_STARTED, "distribution client already started"); + errorWrapper.setInnerElement(result); + } + } + + private void validateNotTerminated(Wrapper<IDistributionClientResult> errorWrapper) { + if (isTerminated) { + log.debug("client was terminated"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_IS_TERMINATED, "distribution client was terminated"); + errorWrapper.setInnerElement(result); + } + } + + private boolean isPollingTimeoutValid(int timeout) { + boolean isValid = (timeout >= DistributionClientConstants.POLLING_TIMEOUT_SEC); + if (!isValid) { + log.warn("polling interval is out of range. value should be greater than or equals to " + DistributionClientConstants.POLLING_TIMEOUT_SEC); + log.warn("setting polling interval to default: " + DistributionClientConstants.POLLING_TIMEOUT_SEC); + } + return isValid; + } + + private boolean isPollingIntervalValid(int pollingInt) { + boolean isValid = (pollingInt >= DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); + if (!isValid) { + log.warn("polling interval is out of range. value should be greater than or equals to " + DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); + log.warn("setting polling interval to default: " + DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); + } + return isValid; + } + + private synchronized void initCambriaClient(Wrapper<IDistributionClientResult> errorWrapper) { + if (cambriaIdentityManager == null) { + try { + AbstractAuthenticatedManagerBuilder<CambriaIdentityManager> managerBuilder = new IdentityManagerBuilder().usingHosts(brokerServers); + if (configuration.isUseHttpsWithDmaap()){ + managerBuilder = managerBuilder.usingHttps(); + } + cambriaIdentityManager = managerBuilder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + handleCambriaInitFailure(errorWrapper, e); + } + } + } + + private void handleCambriaInitFailure(Wrapper<IDistributionClientResult> errorWrapper, Exception e) { + final String errorMessage = "Failed initilizing cambria component:" + e.getMessage(); + IDistributionClientResult errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CAMBRIA_INIT_FAILED, errorMessage); + errorWrapper.setInnerElement(errorResponse); + log.error(errorMessage); + log.debug(errorMessage, e); + } + + @Override + public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage, String errorReason) { + log.info("DistributionClient - sendDownloadStatus with errorReason"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + + } + + @Override + public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage, String errorReason) { + log.info("DistributionClient - sendDeploymentStatus with errorReason"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + + } + + @Override + public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage) { + log.info("DistributionClient - sendComponentDone status"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + + } + + @Override + public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage, + String errorReason) { + log.info("DistributionClient - sendComponentDone status with errorReason"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + } + + + @Override + public List<IVfModuleMetadata> decodeVfModuleArtifact(byte[] artifactPayload) { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + String vfModuleJsonString = new String(artifactPayload, StandardCharsets.UTF_8); + final Type type = new TypeToken<List<VfModuleMetadata>>() { + }.getType(); + List<IVfModuleMetadata> vfModules = gson.fromJson(vfModuleJsonString, type); + return vfModules; + } + + + public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage) { + log.info("DistributionClient - sendFinalDistributionStatus status"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + + } + + + @Override + public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage, + String errorReason) { + log.info("DistributionClient - sendFinalDistributionStatus status with errorReason"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + + + } + + public Either<List<String>,IDistributionClientResult> getUEBServerList() { + List<String> msgBusAddresses = configuration.getMsgBusAddress(); + if(msgBusAddresses.isEmpty()){ + return Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS, "Message bus address was not found in the config file")); + } + else{ + return GeneralUtils.convertToValidHostName(msgBusAddresses); + } + } + + + + + + +} |