diff options
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java')
-rw-r--r-- | sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java | 212 |
1 files changed, 173 insertions, 39 deletions
diff --git a/sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java b/sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java index 1543256..a8778e5 100644 --- a/sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java +++ b/sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java @@ -20,6 +20,8 @@ package org.openecomp.sdc.impl; +import static java.util.Objects.isNull; + import java.io.IOException; import java.lang.reflect.Type; import java.net.MalformedURLException; @@ -27,6 +29,7 @@ import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -35,17 +38,16 @@ import java.util.regex.Matcher; import org.openecomp.sdc.api.IDistributionClient; import org.openecomp.sdc.api.IDistributionStatusMessageJsonBuilder; -import org.openecomp.sdc.api.consumer.IConfiguration; -import org.openecomp.sdc.api.consumer.IDistributionStatusMessage; -import org.openecomp.sdc.api.consumer.INotificationCallback; +import org.openecomp.sdc.api.consumer.*; import org.openecomp.sdc.api.notification.IArtifactInfo; import org.openecomp.sdc.api.notification.IVfModuleMetadata; import org.openecomp.sdc.api.results.IDistributionClientDownloadResult; import org.openecomp.sdc.api.results.IDistributionClientResult; -import org.openecomp.sdc.http.AsdcConnectorClient; +import org.openecomp.sdc.http.SdcConnectorClient; import org.openecomp.sdc.http.TopicRegistrationResponse; import org.openecomp.sdc.utils.DistributionActionResultEnum; import org.openecomp.sdc.utils.DistributionClientConstants; +import org.openecomp.sdc.utils.GeneralUtils; import org.openecomp.sdc.utils.Wrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,13 +73,14 @@ public class DistributionClientImpl implements IDistributionClient { private static Logger log = LoggerFactory.getLogger(DistributionClientImpl.class.getName()); - protected AsdcConnectorClient asdcConnector = new AsdcConnectorClient(); + protected SdcConnectorClient asdcConnector = new SdcConnectorClient(); private ScheduledExecutorService executorPool = null; protected CambriaIdentityManager cambriaIdentityManager = null; private List<String> brokerServers; protected ApiCredential credential; protected Configuration configuration; private INotificationCallback callback; + private IStatusCallback statusCallback; private String notificationTopic; private String statusTopic; private boolean isConsumerGroupGenerated = false; @@ -139,7 +142,8 @@ public class DistributionClientImpl implements IDistributionClient { public synchronized IDistributionClientResult start() { log.info("start DistributionClient"); - CambriaConsumer cambriaConsumer = null; + IDistributionClientResult startResult; + CambriaConsumer cambriaNotificationConsumer = null; Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); validateRunReady(errorWrapper); if (errorWrapper.isEmpty()) { @@ -147,30 +151,49 @@ public class DistributionClientImpl implements IDistributionClient { } if (errorWrapper.isEmpty()) { try { - cambriaConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(notificationTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) + cambriaNotificationConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(notificationTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) .withSocketTimeout(configuration.getPollingTimeout() * 1000).build(); + } catch (MalformedURLException | GeneralSecurityException e) { handleCambriaInitFailure(errorWrapper, e); } } - + if (errorWrapper.isEmpty()) { + + List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes(); + // Remove nulls from list - workaround for how configuration is built + while (relevantArtifactTypes.remove(null)); + + NotificationConsumer consumer = new NotificationConsumer(cambriaNotificationConsumer, callback, relevantArtifactTypes, this); + executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); + executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); + + handleStatusConsumer(errorWrapper, executorPool); + } if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); + startResult = errorWrapper.getInnerElement(); + } + else{ + startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client started successfuly"); + isStarted = true; } - - List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes(); - // Remove nulls from list - workaround for how configuration is built - while (relevantArtifactTypes.remove(null)); - - NotificationConsumer consumer = new NotificationConsumer(cambriaConsumer, callback, relevantArtifactTypes, this); - executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); - executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); - - DistributionClientResultImpl startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client started successfuly"); - isStarted = true; return startResult; } + private void handleStatusConsumer(Wrapper<IDistributionClientResult> errorWrapper, ScheduledExecutorService executorPool) { + if( configuration.isConsumeProduceStatusTopic()){ + CambriaConsumer cambriaStatusConsumer = null; + try { + cambriaStatusConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) + .withSocketTimeout(configuration.getPollingTimeout() * 1000).build(); + StatusConsumer statusConsumer = new StatusConsumer(cambriaStatusConsumer, statusCallback); + executorPool.scheduleAtFixedRate(statusConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); + } catch (MalformedURLException | GeneralSecurityException e) { + handleCambriaInitFailure(errorWrapper, e); + } + } + } + @Override /* see javadoc */ public synchronized IDistributionClientResult stop() { @@ -219,7 +242,24 @@ public class DistributionClientImpl implements IDistributionClient { } return asdcConnector.dowloadArtifact(artifactInfo); } + @Override + public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback, + IStatusCallback statusCallback) { + IDistributionClientResult initResult; + if( !conf.isConsumeProduceStatusTopic() ){ + initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'" ); + } + else if( isNull(statusCallback) ){ + initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: statusCallback is not defined" ); + } + else{ + this.statusCallback = statusCallback; + initResult = init(conf, notificationCallback); + } + return initResult; + } + @Override /* * see javadoc @@ -236,7 +276,7 @@ public class DistributionClientImpl implements IDistributionClient { if (errorWrapper.isEmpty()) { validateAndInitConfiguration(errorWrapper, conf); } - // 1. get servers list from ASDC + // 1. get ueb server list from configuration if (errorWrapper.isEmpty()) { initUebServerList(errorWrapper); } @@ -295,6 +335,7 @@ public class DistributionClientImpl implements IDistributionClient { } private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) { + asdcConnector.init(configuration); Either<List<String>, IDistributionClientResult> eitherValidArtifactTypesList = asdcConnector.getValidArtifactTypesList(); if (eitherValidArtifactTypesList.isRight()) { DistributionActionResultEnum errorType = eitherValidArtifactTypesList.right().value().getDistributionActionResult(); @@ -319,15 +360,16 @@ public class DistributionClientImpl implements IDistributionClient { } private void initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) { - log.debug("get cluster server list from ASDC"); - asdcConnector.init(configuration); + log.debug("get ueb cluster server list from component(configuration file)"); - Either<List<String>, IDistributionClientResult> serverListResponse = asdcConnector.getServerList(); + Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList(); if (serverListResponse.isRight()) { errorWrapper.setInnerElement(serverListResponse.right().value()); } else { + brokerServers = serverListResponse.left().value(); } + } private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) { @@ -457,32 +499,25 @@ public class DistributionClientImpl implements IDistributionClient { result = DistributionActionResultEnum.CONF_MISSING_USERNAME; } else if (conf.getPassword() == null || conf.getPassword().isEmpty()) { result = DistributionActionResultEnum.CONF_MISSING_PASSWORD; + } else if (conf.getMsgBusAddress() == null || conf.getMsgBusAddress().isEmpty()) { + result = DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS; } else if (conf.getAsdcAddress() == null || conf.getAsdcAddress().isEmpty()) { result = DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN; } else if (!isValidFqdn(conf.getAsdcAddress())) { result = DistributionActionResultEnum.CONF_INVALID_ASDC_FQDN; + } else if (!isValidFqdns(conf.getMsgBusAddress())){ + result = DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS; } else if (conf.getEnvironmentName() == null || conf.getEnvironmentName().isEmpty()) { result = DistributionActionResultEnum.CONF_MISSING_ENVIRONMENT_NAME; } else if (conf.getRelevantArtifactTypes() == null || conf.getRelevantArtifactTypes().isEmpty()) { result = DistributionActionResultEnum.CONF_MISSING_ARTIFACT_TYPES; } + else if( conf.isConsumeProduceStatusTopic() && Objects.isNull(statusCallback) ){ + result = DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG; + } // DistributionActionResultEnum.SUCCESS else { - this.configuration = new Configuration(conf); - if (!isPollingIntervalValid(conf.getPollingInterval())) { - configuration.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); - } - if (!isPollingTimeoutValid(conf.getPollingTimeout())) { - configuration.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC); - } - if (conf.getConsumerGroup() == null) { - generateConsumerGroup(); - } - - //Default use HTTPS with DMAAP - if (conf.isUseHttpsWithDmaap() == null){ - configuration.setUseHttpsWithDmaap(true); - } + handleValidConf(conf); } if (result != DistributionActionResultEnum.SUCCESS) { @@ -495,6 +530,24 @@ public class DistributionClientImpl implements IDistributionClient { return result; } + private void handleValidConf(IConfiguration conf) { + this.configuration = new Configuration(conf); + if (!isPollingIntervalValid(conf.getPollingInterval())) { + configuration.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); + } + if (!isPollingTimeoutValid(conf.getPollingTimeout())) { + configuration.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC); + } + if (conf.getConsumerGroup() == null) { + generateConsumerGroup(); + } + + //Default use HTTPS with DMAAP + if (conf.isUseHttpsWithDmaap() == null){ + configuration.setUseHttpsWithDmaap(true); + } + } + private void generateConsumerGroup() { String generatedConsumerGroup = UUID.randomUUID().toString(); configuration.setConsumerGroup(generatedConsumerGroup); @@ -509,6 +562,19 @@ public class DistributionClientImpl implements IDistributionClient { } return false; } + protected boolean isValidFqdns(List<String> fqdns) { + if (fqdns != null && !fqdns.isEmpty()) { + for (String fqdn : fqdns) { + if (isValidFqdn(fqdn)) { + continue; + } else { + return false; + } + } + return true; + } + return false; + } private void shutdownExecutor() { if (executorPool == null) @@ -633,6 +699,31 @@ public class DistributionClientImpl implements IDistributionClient { } @Override + public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage) { + log.info("DistributionClient - sendComponentDone status"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + + } + + @Override + public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage, + String errorReason) { + log.info("DistributionClient - sendComponentDone status with errorReason"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + } + + + @Override public List<IVfModuleMetadata> decodeVfModuleArtifact(byte[] artifactPayload) { Gson gson = new GsonBuilder().setPrettyPrinting().create(); String vfModuleJsonString = new String(artifactPayload, StandardCharsets.UTF_8); @@ -641,4 +732,47 @@ public class DistributionClientImpl implements IDistributionClient { List<IVfModuleMetadata> vfModules = gson.fromJson(vfModuleJsonString, type); return vfModules; } + + + public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage) { + log.info("DistributionClient - sendFinalDistributionStatus status"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + + } + + + @Override + public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage, + String errorReason) { + log.info("DistributionClient - sendFinalDistributionStatus status with errorReason"); + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateRunReady(errorWrapper); + if (!errorWrapper.isEmpty()) { + return errorWrapper.getInnerElement(); + } + return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + + + } + + public Either<List<String>,IDistributionClientResult> getUEBServerList() { + List<String> msgBusAddresses = configuration.getMsgBusAddress(); + if(msgBusAddresses.isEmpty()){ + return Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS, "Message bus address was not found in the config file")); + } + else{ + return GeneralUtils.convertToValidHostName(msgBusAddresses); + } + } + + + + + + } |