aboutsummaryrefslogtreecommitdiffstats
path: root/sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java')
-rw-r--r--sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java212
1 files changed, 173 insertions, 39 deletions
diff --git a/sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java b/sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java
index 1543256..a8778e5 100644
--- a/sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java
+++ b/sdc-distribution-client/src/main/java/org/openecomp/sdc/impl/DistributionClientImpl.java
@@ -20,6 +20,8 @@
package org.openecomp.sdc.impl;
+import static java.util.Objects.isNull;
+
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
@@ -27,6 +29,7 @@ import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -35,17 +38,16 @@ import java.util.regex.Matcher;
import org.openecomp.sdc.api.IDistributionClient;
import org.openecomp.sdc.api.IDistributionStatusMessageJsonBuilder;
-import org.openecomp.sdc.api.consumer.IConfiguration;
-import org.openecomp.sdc.api.consumer.IDistributionStatusMessage;
-import org.openecomp.sdc.api.consumer.INotificationCallback;
+import org.openecomp.sdc.api.consumer.*;
import org.openecomp.sdc.api.notification.IArtifactInfo;
import org.openecomp.sdc.api.notification.IVfModuleMetadata;
import org.openecomp.sdc.api.results.IDistributionClientDownloadResult;
import org.openecomp.sdc.api.results.IDistributionClientResult;
-import org.openecomp.sdc.http.AsdcConnectorClient;
+import org.openecomp.sdc.http.SdcConnectorClient;
import org.openecomp.sdc.http.TopicRegistrationResponse;
import org.openecomp.sdc.utils.DistributionActionResultEnum;
import org.openecomp.sdc.utils.DistributionClientConstants;
+import org.openecomp.sdc.utils.GeneralUtils;
import org.openecomp.sdc.utils.Wrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,13 +73,14 @@ public class DistributionClientImpl implements IDistributionClient {
private static Logger log = LoggerFactory.getLogger(DistributionClientImpl.class.getName());
- protected AsdcConnectorClient asdcConnector = new AsdcConnectorClient();
+ protected SdcConnectorClient asdcConnector = new SdcConnectorClient();
private ScheduledExecutorService executorPool = null;
protected CambriaIdentityManager cambriaIdentityManager = null;
private List<String> brokerServers;
protected ApiCredential credential;
protected Configuration configuration;
private INotificationCallback callback;
+ private IStatusCallback statusCallback;
private String notificationTopic;
private String statusTopic;
private boolean isConsumerGroupGenerated = false;
@@ -139,7 +142,8 @@ public class DistributionClientImpl implements IDistributionClient {
public synchronized IDistributionClientResult start() {
log.info("start DistributionClient");
- CambriaConsumer cambriaConsumer = null;
+ IDistributionClientResult startResult;
+ CambriaConsumer cambriaNotificationConsumer = null;
Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
validateRunReady(errorWrapper);
if (errorWrapper.isEmpty()) {
@@ -147,30 +151,49 @@ public class DistributionClientImpl implements IDistributionClient {
}
if (errorWrapper.isEmpty()) {
try {
- cambriaConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(notificationTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers)
+ cambriaNotificationConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(notificationTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers)
.withSocketTimeout(configuration.getPollingTimeout() * 1000).build();
+
} catch (MalformedURLException | GeneralSecurityException e) {
handleCambriaInitFailure(errorWrapper, e);
}
}
-
+ if (errorWrapper.isEmpty()) {
+
+ List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes();
+ // Remove nulls from list - workaround for how configuration is built
+ while (relevantArtifactTypes.remove(null));
+
+ NotificationConsumer consumer = new NotificationConsumer(cambriaNotificationConsumer, callback, relevantArtifactTypes, this);
+ executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE);
+ executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS);
+
+ handleStatusConsumer(errorWrapper, executorPool);
+ }
if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
+ startResult = errorWrapper.getInnerElement();
+ }
+ else{
+ startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client started successfuly");
+ isStarted = true;
}
-
- List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes();
- // Remove nulls from list - workaround for how configuration is built
- while (relevantArtifactTypes.remove(null));
-
- NotificationConsumer consumer = new NotificationConsumer(cambriaConsumer, callback, relevantArtifactTypes, this);
- executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE);
- executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS);
-
- DistributionClientResultImpl startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client started successfuly");
- isStarted = true;
return startResult;
}
+ private void handleStatusConsumer(Wrapper<IDistributionClientResult> errorWrapper, ScheduledExecutorService executorPool) {
+ if( configuration.isConsumeProduceStatusTopic()){
+ CambriaConsumer cambriaStatusConsumer = null;
+ try {
+ cambriaStatusConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers)
+ .withSocketTimeout(configuration.getPollingTimeout() * 1000).build();
+ StatusConsumer statusConsumer = new StatusConsumer(cambriaStatusConsumer, statusCallback);
+ executorPool.scheduleAtFixedRate(statusConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS);
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ handleCambriaInitFailure(errorWrapper, e);
+ }
+ }
+ }
+
@Override
/* see javadoc */
public synchronized IDistributionClientResult stop() {
@@ -219,7 +242,24 @@ public class DistributionClientImpl implements IDistributionClient {
}
return asdcConnector.dowloadArtifact(artifactInfo);
}
+ @Override
+ public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback,
+ IStatusCallback statusCallback) {
+ IDistributionClientResult initResult;
+ if( !conf.isConsumeProduceStatusTopic() ){
+ initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'" );
+ }
+ else if( isNull(statusCallback) ){
+ initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: statusCallback is not defined" );
+ }
+ else{
+ this.statusCallback = statusCallback;
+ initResult = init(conf, notificationCallback);
+ }
+ return initResult;
+ }
+
@Override
/*
* see javadoc
@@ -236,7 +276,7 @@ public class DistributionClientImpl implements IDistributionClient {
if (errorWrapper.isEmpty()) {
validateAndInitConfiguration(errorWrapper, conf);
}
- // 1. get servers list from ASDC
+ // 1. get ueb server list from configuration
if (errorWrapper.isEmpty()) {
initUebServerList(errorWrapper);
}
@@ -295,6 +335,7 @@ public class DistributionClientImpl implements IDistributionClient {
}
private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) {
+ asdcConnector.init(configuration);
Either<List<String>, IDistributionClientResult> eitherValidArtifactTypesList = asdcConnector.getValidArtifactTypesList();
if (eitherValidArtifactTypesList.isRight()) {
DistributionActionResultEnum errorType = eitherValidArtifactTypesList.right().value().getDistributionActionResult();
@@ -319,15 +360,16 @@ public class DistributionClientImpl implements IDistributionClient {
}
private void initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) {
- log.debug("get cluster server list from ASDC");
- asdcConnector.init(configuration);
+ log.debug("get ueb cluster server list from component(configuration file)");
- Either<List<String>, IDistributionClientResult> serverListResponse = asdcConnector.getServerList();
+ Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList();
if (serverListResponse.isRight()) {
errorWrapper.setInnerElement(serverListResponse.right().value());
} else {
+
brokerServers = serverListResponse.left().value();
}
+
}
private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) {
@@ -457,32 +499,25 @@ public class DistributionClientImpl implements IDistributionClient {
result = DistributionActionResultEnum.CONF_MISSING_USERNAME;
} else if (conf.getPassword() == null || conf.getPassword().isEmpty()) {
result = DistributionActionResultEnum.CONF_MISSING_PASSWORD;
+ } else if (conf.getMsgBusAddress() == null || conf.getMsgBusAddress().isEmpty()) {
+ result = DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS;
} else if (conf.getAsdcAddress() == null || conf.getAsdcAddress().isEmpty()) {
result = DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN;
} else if (!isValidFqdn(conf.getAsdcAddress())) {
result = DistributionActionResultEnum.CONF_INVALID_ASDC_FQDN;
+ } else if (!isValidFqdns(conf.getMsgBusAddress())){
+ result = DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS;
} else if (conf.getEnvironmentName() == null || conf.getEnvironmentName().isEmpty()) {
result = DistributionActionResultEnum.CONF_MISSING_ENVIRONMENT_NAME;
} else if (conf.getRelevantArtifactTypes() == null || conf.getRelevantArtifactTypes().isEmpty()) {
result = DistributionActionResultEnum.CONF_MISSING_ARTIFACT_TYPES;
}
+ else if( conf.isConsumeProduceStatusTopic() && Objects.isNull(statusCallback) ){
+ result = DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG;
+ }
// DistributionActionResultEnum.SUCCESS
else {
- this.configuration = new Configuration(conf);
- if (!isPollingIntervalValid(conf.getPollingInterval())) {
- configuration.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC);
- }
- if (!isPollingTimeoutValid(conf.getPollingTimeout())) {
- configuration.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC);
- }
- if (conf.getConsumerGroup() == null) {
- generateConsumerGroup();
- }
-
- //Default use HTTPS with DMAAP
- if (conf.isUseHttpsWithDmaap() == null){
- configuration.setUseHttpsWithDmaap(true);
- }
+ handleValidConf(conf);
}
if (result != DistributionActionResultEnum.SUCCESS) {
@@ -495,6 +530,24 @@ public class DistributionClientImpl implements IDistributionClient {
return result;
}
+ private void handleValidConf(IConfiguration conf) {
+ this.configuration = new Configuration(conf);
+ if (!isPollingIntervalValid(conf.getPollingInterval())) {
+ configuration.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC);
+ }
+ if (!isPollingTimeoutValid(conf.getPollingTimeout())) {
+ configuration.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC);
+ }
+ if (conf.getConsumerGroup() == null) {
+ generateConsumerGroup();
+ }
+
+ //Default use HTTPS with DMAAP
+ if (conf.isUseHttpsWithDmaap() == null){
+ configuration.setUseHttpsWithDmaap(true);
+ }
+ }
+
private void generateConsumerGroup() {
String generatedConsumerGroup = UUID.randomUUID().toString();
configuration.setConsumerGroup(generatedConsumerGroup);
@@ -509,6 +562,19 @@ public class DistributionClientImpl implements IDistributionClient {
}
return false;
}
+ protected boolean isValidFqdns(List<String> fqdns) {
+ if (fqdns != null && !fqdns.isEmpty()) {
+ for (String fqdn : fqdns) {
+ if (isValidFqdn(fqdn)) {
+ continue;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
private void shutdownExecutor() {
if (executorPool == null)
@@ -633,6 +699,31 @@ public class DistributionClientImpl implements IDistributionClient {
}
@Override
+ public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage) {
+ log.info("DistributionClient - sendComponentDone status");
+ Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
+ validateRunReady(errorWrapper);
+ if (!errorWrapper.isEmpty()) {
+ return errorWrapper.getInnerElement();
+ }
+ return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+
+ }
+
+ @Override
+ public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage,
+ String errorReason) {
+ log.info("DistributionClient - sendComponentDone status with errorReason");
+ Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
+ validateRunReady(errorWrapper);
+ if (!errorWrapper.isEmpty()) {
+ return errorWrapper.getInnerElement();
+ }
+ return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+ }
+
+
+ @Override
public List<IVfModuleMetadata> decodeVfModuleArtifact(byte[] artifactPayload) {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
String vfModuleJsonString = new String(artifactPayload, StandardCharsets.UTF_8);
@@ -641,4 +732,47 @@ public class DistributionClientImpl implements IDistributionClient {
List<IVfModuleMetadata> vfModules = gson.fromJson(vfModuleJsonString, type);
return vfModules;
}
+
+
+ public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage) {
+ log.info("DistributionClient - sendFinalDistributionStatus status");
+ Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
+ validateRunReady(errorWrapper);
+ if (!errorWrapper.isEmpty()) {
+ return errorWrapper.getInnerElement();
+ }
+ return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+
+ }
+
+
+ @Override
+ public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage,
+ String errorReason) {
+ log.info("DistributionClient - sendFinalDistributionStatus status with errorReason");
+ Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
+ validateRunReady(errorWrapper);
+ if (!errorWrapper.isEmpty()) {
+ return errorWrapper.getInnerElement();
+ }
+ return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+
+
+ }
+
+ public Either<List<String>,IDistributionClientResult> getUEBServerList() {
+ List<String> msgBusAddresses = configuration.getMsgBusAddress();
+ if(msgBusAddresses.isEmpty()){
+ return Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS, "Message bus address was not found in the config file"));
+ }
+ else{
+ return GeneralUtils.convertToValidHostName(msgBusAddresses);
+ }
+ }
+
+
+
+
+
+
}