summaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/AaiRequestHandler.java84
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ArtifactInfoImpl.java336
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaErrorResponse.java88
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java1081
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DME2EndpointIteratorCreator.java19
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DeConfigurationStatus.java40
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java633
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineClusterHealth.java557
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java512
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java343
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java161
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotification.java122
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotificationEnum.java2
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java91
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java76
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapHealth.java220
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapNotificationDataImpl.java65
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentMessageBusData.java75
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentsEngine.java526
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ExecutorFactory.java43
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IArtifactInfo.java70
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java37
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapAuditNotificationData.java6
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapNotificationData.java76
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationData.java158
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationHandler.java11
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IResourceArtifactInfo.java22
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/JsonContainerResourceInstance.java224
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationDataImpl.java212
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationExecutorService.java79
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/PublishNotificationRunnable.java156
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ResourceArtifactInfoImpl.java64
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceArtifactInfoImpl.java8
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceDistributionArtifactsBuilder.java503
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/SubscriberTypeEnum.java2
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/UebHealthCheckCall.java68
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/VfModuleArtifactPayload.java183
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/config/DistributionEngineSpringConfig.java11
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/DistributionCompleteReporter.java9
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/MsoDistributionCompleteReporter.java19
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/DistributionStatusRequest.java23
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/MSORestClient.java69
42 files changed, 4119 insertions, 2965 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/AaiRequestHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/AaiRequestHandler.java
new file mode 100644
index 0000000000..0519f435e3
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/AaiRequestHandler.java
@@ -0,0 +1,84 @@
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import org.apache.http.conn.ConnectTimeoutException;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.common.api.Constants;
+import org.openecomp.sdc.common.datastructure.FunctionalInterfaces;
+import org.openecomp.sdc.common.datastructure.FunctionalInterfaces.SupplierThrows;
+import org.openecomp.sdc.common.http.client.api.HttpExecuteException;
+import org.openecomp.sdc.common.http.client.api.HttpRequest;
+import org.openecomp.sdc.common.http.client.api.HttpResponse;
+import org.openecomp.sdc.common.http.client.api.Responses;
+import org.openecomp.sdc.common.http.config.ExternalServiceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.util.Properties;
+import java.util.UUID;
+
+@Component
+public class AaiRequestHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(AaiRequestHandler.class);
+ private ExternalServiceConfig aaiConfig;
+
+ protected static final String OPERATIONAL_ENV_RESOURCE_CONFIG_PARAM = "operationalEnvironments";
+ protected static final String OPERATIONAL_ENV_RESOURCE = "/operational-environment";
+
+ @PostConstruct
+ public void init() {
+ logger.debug("AaiRequestHandler has been initialized.");
+
+ aaiConfig = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getAaiConfig();
+ logger.debug("AaiRequestHandler Configuration={}", aaiConfig);
+ }
+
+
+ public HttpResponse<String> getOperationalEnvById(String id) {
+ Properties headers = createHeaders();
+ String url = String.format("%s%s%s/%s",
+ aaiConfig.getHttpRequestConfig().getServerRootUrl(),
+ aaiConfig.getHttpRequestConfig().getResourceNamespaces().get(OPERATIONAL_ENV_RESOURCE_CONFIG_PARAM),
+ OPERATIONAL_ENV_RESOURCE, id);
+
+ SupplierThrows<HttpResponse<String>, Exception> httpGet = () -> HttpRequest.get(url, headers, aaiConfig.getHttpClientConfig());
+ long maxRetries = aaiConfig.getHttpClientConfig().getNumOfRetries();
+ try {
+ return FunctionalInterfaces.retryMethodOnException(httpGet, this::retryOnException, maxRetries);
+ }
+ catch (Exception e) {
+ logger.debug("Request failed with exception {}", getCause(e).getMessage());
+ return Responses.INTERNAL_SERVER_ERROR;
+ }
+ }
+
+
+ private boolean retryOnException(Exception e) {
+ Throwable cause = getCause(e);
+ return !(cause instanceof ConnectTimeoutException || cause instanceof ConnectException || cause instanceof SocketTimeoutException);
+ }
+
+
+ private Throwable getCause(Exception e) {
+ if (e instanceof HttpExecuteException) {
+ return e.getCause();
+ }
+ return e;
+ }
+
+
+ private Properties createHeaders() {
+ Properties headers = new Properties();
+ headers.put(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON);
+ headers.put(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON);
+ headers.put(Constants.X_TRANSACTION_ID_HEADER, UUID.randomUUID().toString());
+
+ return headers;
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ArtifactInfoImpl.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ArtifactInfoImpl.java
index e1a1270fe3..9c46c68cd8 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ArtifactInfoImpl.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ArtifactInfoImpl.java
@@ -20,177 +20,185 @@
package org.openecomp.sdc.be.components.distribution.engine;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.model.ArtifactDefinition;
+import org.openecomp.sdc.be.model.ComponentInstance;
+import org.openecomp.sdc.be.model.Service;
+import org.openecomp.sdc.common.api.ArtifactTypeEnum;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import org.openecomp.sdc.be.model.ArtifactDefinition;
-import org.openecomp.sdc.be.model.ComponentInstance;
-import org.openecomp.sdc.be.model.Service;
-import org.openecomp.sdc.common.api.ArtifactTypeEnum;
-
public class ArtifactInfoImpl implements IArtifactInfo {
- private String artifactName;
- private ArtifactTypeEnum artifactType;
- private String artifactURL;
- private String artifactChecksum;
- private String artifactDescription;
- private Integer artifactTimeout;
- private String artifactUUID;
- private String artifactVersion;
- private String generatedFromUUID;
- private List<String> relatedArtifacts;
-
- public ArtifactInfoImpl() {
- }
-
- private ArtifactInfoImpl(ArtifactDefinition artifactDef, String generatedFromUUID, List<String> relatedArtifacts) {
- artifactName = artifactDef.getArtifactName();
- artifactType = ArtifactTypeEnum.findType(artifactDef.getArtifactType());
- artifactChecksum = artifactDef.getArtifactChecksum();
- artifactDescription = artifactDef.getDescription();
- artifactTimeout = artifactDef.getTimeout();
- artifactUUID = artifactDef.getArtifactUUID();
- artifactVersion = artifactDef.getArtifactVersion();
- this.relatedArtifacts = relatedArtifacts;
- this.generatedFromUUID = generatedFromUUID;
- }
-
- public static List<ArtifactInfoImpl> convertToArtifactInfoImpl(Service service, ComponentInstance resourceInstance, Collection<ArtifactDefinition> list) {
- List<ArtifactInfoImpl> ret = new ArrayList<ArtifactInfoImpl>();
- Map<String, List<ArtifactDefinition>> artifactIdToDef = list.stream().collect(Collectors.groupingBy(ArtifactDefinition::getUniqueId));
- if (list != null) {
- for (ArtifactDefinition artifactDef : list) {
- String generatedFromUUID = null;
- if (artifactDef.getGeneratedFromId() != null && !artifactDef.getGeneratedFromId().isEmpty()) {
- ArtifactDefinition artifactFrom = artifactIdToDef.get(artifactDef.getGeneratedFromId()).get(0);
- generatedFromUUID = artifactFrom.getArtifactUUID();
- }
- ArtifactInfoImpl artifactInfoImpl = new ArtifactInfoImpl(artifactDef, generatedFromUUID, getUpdatedRequiredArtifactsFromNamesToUuids(artifactDef, resourceInstance.getDeploymentArtifacts()));
- String artifactURL = ServiceDistributionArtifactsBuilder.buildResourceInstanceArtifactUrl(service, resourceInstance, artifactDef.getArtifactName());
- artifactInfoImpl.setArtifactURL(artifactURL);
- ret.add(artifactInfoImpl);
- }
- }
- return ret;
-
- }
-
- public static List<ArtifactInfoImpl> convertServiceArtifactToArtifactInfoImpl(Service service, Collection<ArtifactDefinition> list) {
- List<ArtifactInfoImpl> ret = new ArrayList<ArtifactInfoImpl>();
- Map<String, List<ArtifactDefinition>> artifactIdToDef = list.stream().collect(Collectors.groupingBy(ArtifactDefinition::getUniqueId));
- if (list != null) {
- for (ArtifactDefinition artifactDef : list) {
- String generatedFromUUID = null;
- if (artifactDef.getGeneratedFromId() != null && !artifactDef.getGeneratedFromId().isEmpty()) {
- ArtifactDefinition artifactFrom = artifactIdToDef.get(artifactDef.getGeneratedFromId()).get(0);
- generatedFromUUID = artifactFrom.getArtifactUUID();
- }
- ArtifactInfoImpl artifactInfoImpl = new ArtifactInfoImpl(artifactDef, generatedFromUUID, getUpdatedRequiredArtifactsFromNamesToUuids(artifactDef, service.getDeploymentArtifacts()));
- String artifactURL = ServiceDistributionArtifactsBuilder.buildServiceArtifactUrl(service, artifactDef.getArtifactName());
- artifactInfoImpl.setArtifactURL(artifactURL);
- ret.add(artifactInfoImpl);
- }
- }
- return ret;
-
- }
-
- private static List<String> getUpdatedRequiredArtifactsFromNamesToUuids(ArtifactDefinition artifactDefinition, Map<String, ArtifactDefinition> artifacts) {
- List<String> requiredArtifacts = null;
- if (artifactDefinition != null && artifactDefinition.getRequiredArtifacts() != null && !artifactDefinition.getRequiredArtifacts().isEmpty() && artifacts != null && !artifacts.isEmpty()) {
- requiredArtifacts = artifacts.values().stream().filter(art -> artifactDefinition.getRequiredArtifacts().contains(art.getArtifactName())).map(art -> art.getArtifactUUID()).collect(Collectors.toList());
- }
- return requiredArtifacts;
- }
-
- public String getArtifactName() {
- return artifactName;
- }
-
- public void setArtifactName(String artifactName) {
- this.artifactName = artifactName;
- }
-
- public ArtifactTypeEnum getArtifactType() {
- return artifactType;
- }
-
- public void setArtifactType(ArtifactTypeEnum artifactType) {
- this.artifactType = artifactType;
- }
-
- public String getArtifactURL() {
- return artifactURL;
- }
-
- public void setArtifactURL(String artifactURL) {
- this.artifactURL = artifactURL;
- }
-
- public String getArtifactChecksum() {
- return artifactChecksum;
- }
-
- public void setArtifactChecksum(String artifactChecksum) {
- this.artifactChecksum = artifactChecksum;
- }
-
- public String getArtifactDescription() {
- return artifactDescription;
- }
-
- public void setArtifactDescription(String artifactDescription) {
- this.artifactDescription = artifactDescription;
- }
-
- public Integer getArtifactTimeout() {
- return artifactTimeout;
- }
-
- public void setArtifactTimeout(Integer artifactTimeout) {
- this.artifactTimeout = artifactTimeout;
- }
-
- public List<String> getRelatedArtifacts() {
- return relatedArtifacts;
- }
-
- public void setRelatedArtifacts(List<String> relatedArtifacts) {
- this.relatedArtifacts = relatedArtifacts;
- }
-
- @Override
- public String toString() {
- return "ArtifactInfoImpl [artifactName=" + artifactName + ", artifactType=" + artifactType + ", artifactURL=" + artifactURL + ", artifactChecksum=" + artifactChecksum + ", artifactDescription=" + artifactDescription + ", artifactTimeout="
- + artifactTimeout + ", artifactUUID=" + artifactUUID + ", artifactVersion=" + artifactVersion + ", generatedFromUUID=" + generatedFromUUID + ", relatedArtifacts=" + relatedArtifacts + "]";
- }
-
- public String getArtifactUUID() {
- return artifactUUID;
- }
-
- public void setArtifactUUID(String artifactUUID) {
- this.artifactUUID = artifactUUID;
- }
-
- public String getArtifactVersion() {
- return artifactVersion;
- }
-
- public void setArtifactVersion(String artifactVersion) {
- this.artifactVersion = artifactVersion;
- }
-
- public String getGeneratedFromUUID() {
- return generatedFromUUID;
- }
-
- public void setGeneratedFromUUID(String generatedFromUUID) {
- this.generatedFromUUID = generatedFromUUID;
- }
+ private String artifactName;
+ private ArtifactTypeEnum artifactType;
+ private String artifactURL;
+ private String artifactChecksum;
+ private String artifactDescription;
+ private Integer artifactTimeout;
+ private String artifactUUID;
+ private String artifactVersion;
+ private String generatedFromUUID;
+ private List<String> relatedArtifacts;
+
+ public ArtifactInfoImpl() {
+ }
+
+ private ArtifactInfoImpl(ArtifactDefinition artifactDef, String generatedFromUUID, List<String> relatedArtifacts) {
+ artifactName = artifactDef.getArtifactName();
+ artifactType = ArtifactTypeEnum.findType(artifactDef.getArtifactType());
+ artifactChecksum = artifactDef.getArtifactChecksum();
+ artifactDescription = artifactDef.getDescription();
+ artifactTimeout = artifactDef.getTimeout();
+ artifactUUID = artifactDef.getArtifactUUID();
+ artifactVersion = artifactDef.getArtifactVersion();
+ this.relatedArtifacts = relatedArtifacts;
+ this.generatedFromUUID = generatedFromUUID;
+ }
+
+ public static List<ArtifactInfoImpl> convertToArtifactInfoImpl(Service service, ComponentInstance resourceInstance, Collection<ArtifactDefinition> list) {
+ List<ArtifactInfoImpl> ret = new ArrayList<ArtifactInfoImpl>();
+ Map<String, List<ArtifactDefinition>> artifactIdToDef = list.stream().collect(Collectors.groupingBy(ArtifactDefinition::getUniqueId));
+ if (list != null) {
+ for (ArtifactDefinition artifactDef : list) {
+ String generatedFromUUID = null;
+ if (artifactDef.getGeneratedFromId() != null && !artifactDef.getGeneratedFromId().isEmpty()) {
+ ArtifactDefinition artifactFrom = artifactIdToDef.get(artifactDef.getGeneratedFromId()).get(0);
+ generatedFromUUID = artifactFrom.getArtifactUUID();
+ }
+ ArtifactInfoImpl artifactInfoImpl = new ArtifactInfoImpl(artifactDef, generatedFromUUID, getUpdatedRequiredArtifactsFromNamesToUuids(artifactDef, resourceInstance.getDeploymentArtifacts()));
+ String artifactURL = ServiceDistributionArtifactsBuilder.buildResourceInstanceArtifactUrl(service, resourceInstance, artifactDef.getArtifactName());
+ artifactInfoImpl.setArtifactURL(artifactURL);
+ ret.add(artifactInfoImpl);
+ }
+ }
+ ret.stream().forEach(ArtifactInfoImpl::updateArtifactTimeout);
+ return ret;
+
+ }
+
+ public static List<ArtifactInfoImpl> convertServiceArtifactToArtifactInfoImpl(Service service, Collection<ArtifactDefinition> list) {
+ List<ArtifactInfoImpl> ret = new ArrayList<ArtifactInfoImpl>();
+ Map<String, List<ArtifactDefinition>> artifactIdToDef = list.stream().collect(Collectors.groupingBy(ArtifactDefinition::getUniqueId));
+ if (list != null) {
+ for (ArtifactDefinition artifactDef : list) {
+ String generatedFromUUID = null;
+ if (artifactDef.getGeneratedFromId() != null && !artifactDef.getGeneratedFromId().isEmpty()) {
+ ArtifactDefinition artifactFrom = artifactIdToDef.get(artifactDef.getGeneratedFromId()).get(0);
+ generatedFromUUID = artifactFrom.getArtifactUUID();
+ }
+ ArtifactInfoImpl artifactInfoImpl = new ArtifactInfoImpl(artifactDef, generatedFromUUID, getUpdatedRequiredArtifactsFromNamesToUuids(artifactDef, service.getDeploymentArtifacts()));
+ String artifactURL = ServiceDistributionArtifactsBuilder.buildServiceArtifactUrl(service, artifactDef.getArtifactName());
+ artifactInfoImpl.setArtifactURL(artifactURL);
+ ret.add(artifactInfoImpl);
+ }
+ }
+ return ret;
+
+ }
+
+ private static List<String> getUpdatedRequiredArtifactsFromNamesToUuids(ArtifactDefinition artifactDefinition, Map<String, ArtifactDefinition> artifacts) {
+ List<String> requiredArtifacts = null;
+ if (artifactDefinition != null && artifactDefinition.getRequiredArtifacts() != null && !artifactDefinition.getRequiredArtifacts().isEmpty() && artifacts != null && !artifacts.isEmpty()) {
+ requiredArtifacts = artifacts.values().stream().filter(art -> artifactDefinition.getRequiredArtifacts().contains(art.getArtifactName())).map(art -> art.getArtifactUUID()).collect(Collectors.toList());
+ }
+ return requiredArtifacts;
+ }
+
+ public String getArtifactName() {
+ return artifactName;
+ }
+
+ public void setArtifactName(String artifactName) {
+ this.artifactName = artifactName;
+ }
+
+ public ArtifactTypeEnum getArtifactType() {
+ return artifactType;
+ }
+
+ public void setArtifactType(ArtifactTypeEnum artifactType) {
+ this.artifactType = artifactType;
+ }
+
+ public String getArtifactURL() {
+ return artifactURL;
+ }
+
+ public void setArtifactURL(String artifactURL) {
+ this.artifactURL = artifactURL;
+ }
+
+ public String getArtifactChecksum() {
+ return artifactChecksum;
+ }
+
+ public void setArtifactChecksum(String artifactChecksum) {
+ this.artifactChecksum = artifactChecksum;
+ }
+
+ public String getArtifactDescription() {
+ return artifactDescription;
+ }
+
+ public void setArtifactDescription(String artifactDescription) {
+ this.artifactDescription = artifactDescription;
+ }
+
+ public Integer getArtifactTimeout() {
+ return artifactTimeout;
+ }
+
+ public void setArtifactTimeout(Integer artifactTimeout) {
+ this.artifactTimeout = artifactTimeout;
+ }
+
+ public List<String> getRelatedArtifacts() {
+ return relatedArtifacts;
+ }
+
+ public void setRelatedArtifacts(List<String> relatedArtifacts) {
+ this.relatedArtifacts = relatedArtifacts;
+ }
+
+ @Override
+ public String toString() {
+ return "ArtifactInfoImpl [artifactName=" + artifactName + ", artifactType=" + artifactType + ", artifactURL=" + artifactURL + ", artifactChecksum=" + artifactChecksum + ", artifactDescription=" + artifactDescription + ", artifactTimeout="
+ + artifactTimeout + ", artifactUUID=" + artifactUUID + ", artifactVersion=" + artifactVersion + ", generatedFromUUID=" + generatedFromUUID + ", relatedArtifacts=" + relatedArtifacts + "]";
+ }
+
+ public String getArtifactUUID() {
+ return artifactUUID;
+ }
+
+ public void setArtifactUUID(String artifactUUID) {
+ this.artifactUUID = artifactUUID;
+ }
+
+ public String getArtifactVersion() {
+ return artifactVersion;
+ }
+
+ public void setArtifactVersion(String artifactVersion) {
+ this.artifactVersion = artifactVersion;
+ }
+
+ public String getGeneratedFromUUID() {
+ return generatedFromUUID;
+ }
+
+ public void setGeneratedFromUUID(String generatedFromUUID) {
+ this.generatedFromUUID = generatedFromUUID;
+ }
+
+ public void updateArtifactTimeout(){
+ int currentConfigTimeout = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getCurrentArtifactInstallationTimeout();
+ if(artifactTimeout == null || artifactTimeout < currentConfigTimeout)
+ artifactTimeout = currentConfigTimeout;
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaErrorResponse.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaErrorResponse.java
index 149ea2286a..3251f3d047 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaErrorResponse.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaErrorResponse.java
@@ -20,67 +20,67 @@
package org.openecomp.sdc.be.components.distribution.engine;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+
import java.util.ArrayList;
import java.util.List;
-import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
-
public class CambriaErrorResponse {
- public static final int HTTP_OK = 200;
+ public static final int HTTP_OK = 200;
- public static final int HTTP_INTERNAL_SERVER_ERROR = 500;
+ public static final int HTTP_INTERNAL_SERVER_ERROR = 500;
- CambriaOperationStatus operationStatus;
- Integer httpCode;
- List<String> variables = new ArrayList<String>();
+ CambriaOperationStatus operationStatus;
+ Integer httpCode;
+ List<String> variables = new ArrayList<String>();
- public CambriaErrorResponse() {
- super();
- }
+ public CambriaErrorResponse() {
+ super();
+ }
- public CambriaErrorResponse(CambriaOperationStatus operationStatus) {
- super();
- this.operationStatus = operationStatus;
- }
+ public CambriaErrorResponse(CambriaOperationStatus operationStatus) {
+ super();
+ this.operationStatus = operationStatus;
+ }
- public CambriaErrorResponse(CambriaOperationStatus operationStatus, Integer httpCode) {
- super();
- this.operationStatus = operationStatus;
- this.httpCode = httpCode;
- }
+ public CambriaErrorResponse(CambriaOperationStatus operationStatus, Integer httpCode) {
+ super();
+ this.operationStatus = operationStatus;
+ this.httpCode = httpCode;
+ }
- public CambriaOperationStatus getOperationStatus() {
- return operationStatus;
- }
+ public CambriaOperationStatus getOperationStatus() {
+ return operationStatus;
+ }
- public void setOperationStatus(CambriaOperationStatus operationStatus) {
- this.operationStatus = operationStatus;
- }
+ public void setOperationStatus(CambriaOperationStatus operationStatus) {
+ this.operationStatus = operationStatus;
+ }
- public Integer getHttpCode() {
- return httpCode;
- }
+ public Integer getHttpCode() {
+ return httpCode;
+ }
- public void setHttpCode(Integer httpCode) {
- this.httpCode = httpCode;
- }
+ public void setHttpCode(Integer httpCode) {
+ this.httpCode = httpCode;
+ }
- public void addVariable(String variable) {
- variables.add(variable);
- }
+ public void addVariable(String variable) {
+ variables.add(variable);
+ }
- public List<String> getVariables() {
- return variables;
- }
+ public List<String> getVariables() {
+ return variables;
+ }
- public void setVariables(List<String> variables) {
- this.variables = variables;
- }
+ public void setVariables(List<String> variables) {
+ this.variables = variables;
+ }
- @Override
- public String toString() {
- return "CambriaErrorResponse [operationStatus=" + operationStatus + ", httpCode=" + httpCode + ", variables=" + variables + "]";
- }
+ @Override
+ public String toString() {
+ return "CambriaErrorResponse [operationStatus=" + operationStatus + ", httpCode=" + httpCode + ", variables=" + variables + "]";
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
index c496715a02..d6fee9a9d3 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java
@@ -20,25 +20,7 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.http.HttpStatus;
-import org.openecomp.sdc.be.config.BeEcompErrorManager;
-import org.openecomp.sdc.be.config.ConfigurationManager;
-import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
-import org.openecomp.sdc.common.config.EcompErrorName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.att.nsa.apiClient.credentials.ApiCredential;
import com.att.nsa.apiClient.http.HttpException;
import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
@@ -53,574 +35,591 @@ import com.att.nsa.cambria.client.CambriaConsumer;
import com.att.nsa.cambria.client.CambriaIdentityManager;
import com.att.nsa.cambria.client.CambriaPublisher.message;
import com.att.nsa.cambria.client.CambriaTopicManager;
+import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
-
import fj.data.Either;
+import org.apache.http.HttpStatus;
+import org.openecomp.sdc.be.config.BeEcompErrorManager;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Component("cambriaHandler")
public class CambriaHandler {
- private static Logger logger = LoggerFactory.getLogger(CambriaHandler.class.getName());
+ private static final Logger logger = LoggerFactory.getLogger(CambriaHandler.class);
+
+ private static final String PARTITION_KEY = "asdc" + "aa";
+
+ private final String SEND_NOTIFICATION = "send notification";
+
+ private Gson gson = new Gson();
+
+ private static final String CONSUMER_ID = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getDistributionStatusTopic().getConsumerId();
+
+
+
+ /**
+ * process the response error from Cambria client
+ *
+ * @param message
+ * @return
+ */
+ private Integer processMessageException(String message) {
+
+ String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" };
+
+ Integer result = checkPattern(patterns[0], message, 2);
+ if (result != null) {
+ return result;
+ }
+ result = checkPattern(patterns[1], message, 2);
+
+ return result;
+
+ }
+
+ /**
+ * check whether the message has a match with a given pattern inside it
+ *
+ * @param patternStr
+ * @param message
+ * @param groupIndex
+ * @return
+ */
+ private Integer checkPattern(String patternStr, String message, int groupIndex) {
+ Integer result = null;
+
+ Pattern pattern = Pattern.compile(patternStr);
+ Matcher matcher = pattern.matcher(message);
+ boolean find = matcher.find();
+ if (find) {
+ String httpCode = matcher.group(groupIndex);
+ if (httpCode != null) {
+ try {
+ result = Integer.valueOf(httpCode);
+ } catch (NumberFormatException e) {
+ logger.debug("Failed to parse http code {}", httpCode);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * retrieve all topics from U-EB server
+ *
+ * @param hostSet
+ * @return
+ */
+ public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
+
+ CambriaTopicManager createTopicManager = null;
+ try {
+
+ createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet));
+
+ Set<String> topics = createTopicManager.getTopics();
+
+ if (topics == null || true == topics.isEmpty()) {
+ CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
+ return Either.right(cambriaErrorResponse);
+ }
+
+ return Either.left(topics);
+
+ } catch (IOException | GeneralSecurityException e) {
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
+
+ logger.debug("Failed to fetch topics from U-EB server", e);
+ writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics");
+
+ return Either.right(cambriaErrorResponse);
+ } finally {
+ if (createTopicManager != null) {
+ createTopicManager.close();
+ }
+ }
+
+ }
+
+ /**
+ * process the error message from Cambria client.
+ *
+ * set Cambria status and http code in case we succeed to fetch it
+ *
+ * @return
+ */
+ private CambriaErrorResponse processError(Exception e) {
+
+ CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
+
+ Integer httpCode = processMessageException(e.getMessage());
+
+ if (httpCode != null) {
+ cambriaErrorResponse.setHttpCode(httpCode);
+ switch (httpCode.intValue()) {
+
+ case 401:
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
+ break;
+ case 409:
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
+ break;
+ case 500:
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+ break;
+ default:
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
+ }
+ } else {
+
+ boolean found = false;
+ Throwable throwable = e.getCause();
+ if (throwable != null) {
+ String message = throwable.getMessage();
+
+ Throwable cause = throwable.getCause();
+
+ if (cause != null) {
+ Class<?> clazz = cause.getClass();
+ String className = clazz.getName();
+ if (className.endsWith("UnknownHostException")) {
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
+ cambriaErrorResponse.addVariable(message);
+ found = true;
+ }
+ }
+ }
+
+ if (false == found) {
+ cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
+ cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ return cambriaErrorResponse;
+ }
+
+ /**
+ * write the error to the log
+ *
+ * @param cambriaErrorResponse
+ * @param errorMessage
+ * @param methodName
+ * @param operationDesc
+ */
+ private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) {
+
+ String httpCode = cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode());
+
+ switch (cambriaErrorResponse.getOperationStatus()) {
+ case UNKNOWN_HOST_ERROR:
+ String hostname = cambriaErrorResponse.getVariables().get(0);
+ BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
+ break;
+ case AUTHENTICATION_ERROR:
+ BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
+ break;
+ case CONNNECTION_ERROR:
+ BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
+ break;
+
+ case INTERNAL_SERVER_ERROR:
+ BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
+ break;
+ default:
+ break;
+ }
+
+ }
+
+ /**
+ * create a topic if it does not exists in the topicsList
+ *
+ * @param hostSet
+ * - list of U-EB servers
+ * @param apiKey
+ * @param secretKey
+ * @param topicName
+ * - topic to create
+ * @param partitionCount
+ * @param replicationCount
+ * @return
+ */
+ public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
+
+ CambriaTopicManager createTopicManager = null;
+ try {
+
+ createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey));
+
+ createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
+
+ } catch (HttpException | IOException | GeneralSecurityException e) {
+
+ logger.debug("Failed to create topic {}", topicName, e);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
+
+ if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
+ writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic");
+ }
+
+ return cambriaErrorResponse;
+
+ } finally {
+ if (createTopicManager != null) {
+ createTopicManager.close();
+ }
+ }
+ return new CambriaErrorResponse(CambriaOperationStatus.OK);
+
+ }
+
+ public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
+ CambriaTopicManager createTopicManager = null;
+ try {
+ createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
+
+ if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
+ createTopicManager.revokeProducer(topicName, subscriberApiKey);
+ } else {
+ createTopicManager.revokeConsumer(topicName, subscriberApiKey);
+ }
+
+ } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
+ logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
+
+ CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
+ return cambriaErrorResponse;
+
+ } catch (HttpException | IOException e) {
+ logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
+
+ writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase());
+
+ return cambriaErrorResponse;
+ } finally {
+ if (createTopicManager != null) {
+ createTopicManager.close();
+ }
+ }
+
+ CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
+ return cambriaErrorResponse;
+ }
+
+ /**
+ *
+ * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
+ *
+ * @param hostSet
+ * @param managerApiKey
+ * @param managerSecretKey
+ * @param subscriberApiKey
+ * @param subscriberTypeEnum
+ * @param topicName
+ * @return
+ */
+ public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) {
+
+ CambriaTopicManager createTopicManager = null;
+ try {
+ createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
+
+ if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
+ createTopicManager.allowProducer(topicName, subscriberApiKey);
+ } else {
+ createTopicManager.allowConsumer(topicName, subscriberApiKey);
+ }
+
+ } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
+ logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
+
+ CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
+ return cambriaErrorResponse;
+
+ } catch (HttpException | IOException e) {
+ logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
+
+ writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase());
+
+ return cambriaErrorResponse;
+ } finally {
+ if (createTopicManager != null) {
+ createTopicManager.close();
+ }
+ }
+
+ CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
+ return cambriaErrorResponse;
+ }
+
+ /**
+ * create and retrieve a Cambria Consumer for a specific topic
+ *
+ * @param hostSet
+ * @param topicName
+ * @param apiKey
+ * @param secretKey
+ * @param consumerId
+ * @param consumerGroup
+ * @param timeoutMS
+ * @return
+ * @throws Exception
+ */
+ public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
+
+ CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHosts(hostSet).waitAtServer(timeoutMS).build();
+ consumer.setApiCredentials(apiKey, secretKey);
+ return consumer;
+ }
+
+ public void closeConsumer(CambriaConsumer consumer) {
+
+ if (consumer != null) {
+ consumer.close();
+ }
+
+ }
+
+ /**
+ * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
+ *
+ * @param topicConsumer
+ * @return
+ */
+ public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
+
+ try {
+ Iterable<String> messages = topicConsumer.fetch();
+ if (messages == null) {
+ messages = new ArrayList<String>();
+ }
+ return Either.left(messages);
+
+ } catch (IOException e) {
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
+
+ logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
+ writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic");
+
+ return Either.right(cambriaErrorResponse);
+
+ } catch (Exception e) {
+ logger.debug("Failed to fetch from U-EB topic", e);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
+
+ CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ return Either.right(cambriaErrorResponse);
+ }
+ }
+
+ /**
+ * Publish notification message to a given queue
+ *
+ * @param topicName
+ * @param uebPublicKey
+ * @param uebSecretKey
+ * @param uebServers
+ * @param data
+ * @return
+ */
+ public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
+
+ CambriaBatchingPublisher createSimplePublisher = null;
+
+ try {
+
+ String json = gson.toJson(data);
+ logger.trace("Before sending notification data {} to topic {}", json, topicName);
+
+ createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
+ createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
- private static final String PARTITION_KEY = "asdc" + "aa";
-
- private final String SEND_NOTIFICATION = "send notification";
-
- private Gson gson = new Gson();
-
- public static boolean useHttpsWithDmaap = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().isUseHttpsWithDmaap();
+ int result = createSimplePublisher.send(PARTITION_KEY, json);
+ try {
+ Thread.sleep(1 * 1000);
+ } catch (InterruptedException e) {
+ logger.debug("Failed during sleep after sending the message.", e);
+ }
- /**
- * process the response error from Cambria client
- *
- * @param message
- * @return
- */
- private Integer processMessageException(String message) {
-
- String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" };
-
- Integer result = checkPattern(patterns[0], message, 2);
- if (result != null) {
- return result;
- }
- result = checkPattern(patterns[1], message, 2);
-
- return result;
-
- }
-
- /**
- * check whether the message has a match with a given pattern inside it
- *
- * @param patternStr
- * @param message
- * @param groupIndex
- * @return
- */
- private Integer checkPattern(String patternStr, String message, int groupIndex) {
- Integer result = null;
-
- Pattern pattern = Pattern.compile(patternStr);
- Matcher matcher = pattern.matcher(message);
- boolean find = matcher.find();
- if (find) {
- String httpCode = matcher.group(groupIndex);
- if (httpCode != null) {
- try {
- result = Integer.valueOf(httpCode);
- } catch (NumberFormatException e) {
- logger.debug("Failed to parse http code {}", httpCode);
- }
- }
- }
- return result;
- }
-
- /**
- * retrieve all topics from U-EB server
- *
- * @param hostSet
- * @return
- */
- public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) {
-
- CambriaTopicManager createTopicManager = null;
- try {
-
- createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet));
-
- Set<String> topics = createTopicManager.getTopics();
-
- if (topics == null || true == topics.isEmpty()) {
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null);
- return Either.right(cambriaErrorResponse);
- }
-
- return Either.left(topics);
-
- } catch (IOException | GeneralSecurityException e) {
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- CambriaErrorResponse cambriaErrorResponse = processError(e);
-
- logger.debug("Failed to fetch topics from U-EB server", e);
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics");
-
- return Either.right(cambriaErrorResponse);
- } finally {
- if (createTopicManager != null) {
- createTopicManager.close();
- }
- }
-
- }
-
- /**
- * process the error message from Cambria client.
- *
- * set Cambria status and http code in case we succeed to fetch it
- *
- * @return
- */
- private CambriaErrorResponse processError(Exception e) {
-
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse();
-
- Integer httpCode = processMessageException(e.getMessage());
-
- if (httpCode != null) {
- cambriaErrorResponse.setHttpCode(httpCode);
- switch (httpCode.intValue()) {
-
- case 401:
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR);
- break;
- case 409:
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST);
- break;
- case 500:
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR);
- break;
- default:
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
- }
- } else {
-
- boolean found = false;
- Throwable throwable = e.getCause();
- if (throwable != null) {
- String message = throwable.getMessage();
-
- Throwable cause = throwable.getCause();
-
- if (cause != null) {
- Class<?> clazz = cause.getClass();
- String className = clazz.getName();
- if (className.endsWith("UnknownHostException")) {
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR);
- cambriaErrorResponse.addVariable(message);
- found = true;
- }
- }
- }
-
- if (false == found) {
- cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR);
- cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
- }
- }
-
- return cambriaErrorResponse;
- }
-
- /**
- * write the error to the log
- *
- * @param cambriaErrorResponse
- * @param errorMessage
- * @param methodName
- * @param operationDesc
- */
- private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) {
-
- String httpCode = (cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode()));
-
- switch (cambriaErrorResponse.getOperationStatus()) {
- case UNKNOWN_HOST_ERROR:
- String hostname = cambriaErrorResponse.getVariables().get(0);
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebUnkownHostError, methodName, hostname);
- BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode);
- break;
- case AUTHENTICATION_ERROR:
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebAuthenticationError, methodName, httpCode);
- BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode);
- break;
- case CONNNECTION_ERROR:
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebConnectionError, methodName, httpCode);
- BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode);
- break;
-
- case INTERNAL_SERVER_ERROR:
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, methodName, operationDesc);
- BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc);
- break;
- default:
- break;
- }
-
- }
-
- /**
- * create a topic if it does not exists in the topicsList
- *
- * @param hostSet
- * - list of U-EB servers
- * @param apiKey
- * @param secretKey
- * @param topicName
- * - topic to create
- * @param partitionCount
- * @param replicationCount
- * @return
- */
- public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) {
-
- CambriaTopicManager createTopicManager = null;
- try {
-
- createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey));
-
- createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount);
-
- } catch (HttpException | IOException | GeneralSecurityException e) {
-
- logger.debug("Failed to create topic {}", topicName, e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- CambriaErrorResponse cambriaErrorResponse = processError(e);
-
- if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic");
- }
-
- return cambriaErrorResponse;
-
- } finally {
- if (createTopicManager != null) {
- createTopicManager.close();
- }
- }
- return new CambriaErrorResponse(CambriaOperationStatus.OK);
-
- }
-
- public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
- CambriaTopicManager createTopicManager = null;
- try {
- createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
-
- if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
- createTopicManager.revokeProducer(topicName, subscriberApiKey);
- } else {
- createTopicManager.revokeConsumer(topicName, subscriberApiKey);
- }
-
- } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
- logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage());
- BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
-
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
- return cambriaErrorResponse;
-
- } catch (HttpException | IOException e) {
- logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- CambriaErrorResponse cambriaErrorResponse = processError(e);
-
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase());
-
- return cambriaErrorResponse;
- } finally {
- if (createTopicManager != null) {
- createTopicManager.close();
- }
- }
-
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
- return cambriaErrorResponse;
- }
-
- /**
- *
- * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER
- *
- * @param hostSet
- * @param topicName
- * @param managerApiKey
- * @param managerSecretKey
- * @param subscriberApiKey
- * @param subscriberTypeEnum
- * @return
- */
- public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) {
-
- CambriaTopicManager createTopicManager = null;
- try {
- createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey));
-
- if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) {
- createTopicManager.allowProducer(topicName, subscriberApiKey);
- } else {
- createTopicManager.allowConsumer(topicName, subscriberApiKey);
- }
-
- } catch (HttpObjectNotFoundException | GeneralSecurityException e) {
- logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage());
- BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage());
-
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND);
- return cambriaErrorResponse;
-
- } catch (HttpException | IOException e) {
- logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- CambriaErrorResponse cambriaErrorResponse = processError(e);
-
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase());
-
- return cambriaErrorResponse;
- } finally {
- if (createTopicManager != null) {
- createTopicManager.close();
- }
- }
-
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK);
- return cambriaErrorResponse;
- }
-
- /**
- * create and retrieve a Cambria Consumer for a specific topic
- *
- * @param hostSet
- * @param topicName
- * @param apiKey
- * @param secretKey
- * @param consumerId
- * @param consumerGroup
- * @param timeoutMS
- * @return
- * @throws Exception
- */
- public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception {
-
- CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(hostSet).withSocketTimeout(timeoutMS).build();
- consumer.setApiCredentials(apiKey, secretKey);
- return consumer;
- }
-
- public void closeConsumer(CambriaConsumer consumer) {
-
- if (consumer != null) {
- consumer.close();
- }
-
- }
-
- /**
- * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error)
- *
- * @param topicConsumer
- * @return
- */
- public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) {
-
- try {
- Iterable<String> messages = topicConsumer.fetch();
- if (messages == null) {
- messages = new ArrayList<String>();
- }
- return Either.left(messages);
-
- } catch (IOException e) {
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- CambriaErrorResponse cambriaErrorResponse = processError(e);
-
- logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage());
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic");
-
- return Either.right(cambriaErrorResponse);
-
- } catch (Exception e) {
- logger.debug("Failed to fetch from U-EB topic", e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
+ logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
- BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
+ CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
- CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR);
- return Either.right(cambriaErrorResponse);
- }
- }
+ return response;
- /**
- * Publish notification message to a given queue
- *
- * @param topicName
- * @param uebPublicKey
- * @param uebSecretKey
- * @param uebServers
- * @param data
- * @return
- */
- public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) {
+ } catch (IOException | GeneralSecurityException e) {
+ logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
- CambriaBatchingPublisher createSimplePublisher = null;
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
- try {
+ CambriaErrorResponse cambriaErrorResponse = processError(e);
- String json = gson.toJson(data);
- logger.trace("Before sending notification data {} to topic {}", json, topicName);
+ writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, SEND_NOTIFICATION);
- createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build();
- createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
-
- int result = createSimplePublisher.send(PARTITION_KEY, json);
-
- try {
- Thread.sleep(1 * 1000);
- } catch (InterruptedException e) {
- logger.debug("Failed during sleep after sending the message.", e);
- }
-
- logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
-
- CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
-
- return response;
+ return cambriaErrorResponse;
+ } finally {
+ if (createSimplePublisher != null) {
+ logger.debug("Before closing publisher");
+ createSimplePublisher.close();
+ logger.debug("After closing publisher");
+ }
+ }
+ }
- } catch (IOException | GeneralSecurityException e) {
- logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
+ public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
+ CambriaBatchingPublisher createSimplePublisher = null;
- CambriaErrorResponse cambriaErrorResponse = processError(e);
+ CambriaErrorResponse response = null;
+ try {
- writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, SEND_NOTIFICATION);
+ String json = gson.toJson(data);
+ logger.debug("Before sending notification data {} to topic {}", json, topicName);
- return cambriaErrorResponse;
- } finally {
- if (createSimplePublisher != null) {
- logger.debug("Before closing publisher");
- createSimplePublisher.close();
- logger.debug("After closing publisher");
- }
- }
- }
+ createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build();
+ createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
- private String convertListToString(List<String> list) {
- StringBuilder builder = new StringBuilder();
+ int result = createSimplePublisher.send(PARTITION_KEY, json);
- if (list != null) {
- for (int i = 0; i < list.size(); i++) {
- builder.append(list.get(i));
- if (i < list.size() - 1) {
- builder.append(",");
- }
- }
- }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.debug("Failed during sleep after sending the message.", e);
+ }
- return builder.toString();
- }
+ logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
- public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) {
+ } catch (IOException | GeneralSecurityException e) {
+ logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
- CambriaBatchingPublisher createSimplePublisher = null;
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
- CambriaErrorResponse response = null;
- try {
+ response = processError(e);
- String json = gson.toJson(data);
- logger.debug("Before sending notification data {} to topic {}", json, topicName);
+ writeErrorToLog(response, e.getMessage(), methodName, SEND_NOTIFICATION);
- createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build();
- createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey);
+ return response;
- int result = createSimplePublisher.send(PARTITION_KEY, json);
+ }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.debug("Failed during sleep after sending the message.", e);
- }
+ logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
+ try {
+ List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS);
+ if (messagesInQ != null && false == messagesInQ.isEmpty()) {
+ logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
+ response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+ writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION);
+ } else {
+ logger.debug("No message left in the queue after closing cambria publisher");
+ response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
+ }
+ } catch (IOException | InterruptedException e) {
+ logger.debug("Failed to close cambria publisher", e);
+ response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+ writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION);
+ }
+ logger.debug("After closing publisher");
- logger.debug("After sending notification data to topic {}. result is {}", topicName, result);
+ return response;
- } catch (IOException | GeneralSecurityException e) {
- logger.debug("Failed to send notification {} to topic {} ", data, topicName, e);
+ }
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
+ public CambriaErrorResponse getApiKey(String server, String apiKey) {
- response = processError(e);
+ CambriaErrorResponse response = null;
- writeErrorToLog(response, e.getMessage(), methodName, SEND_NOTIFICATION);
+ List<String> hostSet = new ArrayList<>();
+ hostSet.add(server);
+ try {
+ CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
+ createIdentityManager.getApiKey(apiKey);
- return response;
+ response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
- }
+ } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
+ logger.debug("Failed to fetch api key {} from server {}", apiKey, server, e);
- logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout);
- try {
- List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS);
- if (messagesInQ != null && false == messagesInQ.isEmpty()) {
- logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size());
- response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
- writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION);
- } else {
- logger.debug("No message left in the queue after closing cambria publisher");
- response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
- }
- } catch (IOException | InterruptedException e) {
- logger.debug("Failed to close cambria publisher", e);
- response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
- writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION);
- }
- logger.debug("After closing publisher");
+ response = processError(e);
- return response;
+ }
- }
+ return response;
+ }
- public CambriaErrorResponse getApiKey(String server, String apiKey) {
+ public Either<ApiCredential, CambriaErrorResponse> createUebKeys(List<String> hostSet) {
+ Either<ApiCredential, CambriaErrorResponse> result;
- CambriaErrorResponse response = null;
+ try {
+ CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
- List<String> hostSet = new ArrayList<>();
- hostSet.add(server);
- CambriaIdentityManager createIdentityManager = null;
- try {
- createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet));
- createIdentityManager.getApiKey(apiKey);
- response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
+ String description = String.format("ASDC Key for %s", CONSUMER_ID);
+ ApiCredential credential = createIdentityManager.createApiKey("", description);
+ createIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
+ result = Either.left(credential);
- } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) {
- logger.debug("Failed to fetch api key {} from server ", apiKey, server, e);
+ } catch (Exception e) {
+ logger.debug("Failed to create ueb keys for servers {}",hostSet, e);
- response = processError(e);
+ result = Either.right(processError(e));
- }
+ }
- return response;
- }
+ return result;
+ }
- private static <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException {
- if (useHttpsWithDmaap) {
- client.usingHttps();
- }
- return (T)client.build();
- }
+ @VisibleForTesting
+ <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException {
+ return (T)client.build();
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DME2EndpointIteratorCreator.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DME2EndpointIteratorCreator.java
new file mode 100644
index 0000000000..94fff3cfaa
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DME2EndpointIteratorCreator.java
@@ -0,0 +1,19 @@
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import com.att.aft.dme2.api.DME2Exception;
+import com.att.aft.dme2.api.DME2Manager;
+import com.att.aft.dme2.iterator.DME2EndpointIterator;
+import com.att.aft.dme2.iterator.factory.DME2EndpointIteratorFactory;
+
+import org.springframework.stereotype.Component;
+
+@Component
+public class DME2EndpointIteratorCreator {
+
+ public DME2EndpointIterator create(String lookupURI) throws DME2Exception {
+ // Initializing DME2Manager instance
+ DME2Manager manager = DME2Manager.getDefaultInstance();
+ // Returning an instance of the DME2EndpointIteratorFactory
+ return (DME2EndpointIterator) DME2EndpointIteratorFactory.getInstance().getIterator(lookupURI, null, null, manager);
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DeConfigurationStatus.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DeConfigurationStatus.java
deleted file mode 100644
index 5e4c08275f..0000000000
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DeConfigurationStatus.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * SDC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.sdc.be.components.distribution.engine;
-
-public enum DeConfigurationStatus {
-
- OK(""), MISSING_CONFIGURATION("");
-
- private String description;
-
- DeConfigurationStatus(String description) {
- this.description = description;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java
index bd3d74e323..7d2d4680b5 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java
@@ -20,361 +20,340 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.openecomp.sdc.be.components.validation.ServiceDistributionValidation;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.ConfigurationManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
-import org.openecomp.sdc.be.impl.ComponentsUtils;
+import org.openecomp.sdc.be.dao.api.ActionStatus;
import org.openecomp.sdc.be.model.Service;
import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus;
-import org.openecomp.sdc.common.config.EcompErrorName;
+import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
import org.openecomp.sdc.common.util.YamlToObjectConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import fj.data.Either;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
@Component("distributionEngine")
public class DistributionEngine implements IDistributionEngine {
- public static final Pattern FQDN_PATTERN = Pattern.compile("^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9])(\\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9]))*(:[0-9]{2,4})*$", Pattern.CASE_INSENSITIVE);
-
- public static void main(String[] args) {
-
- List<String> servers = new ArrayList<>();
-
- servers.add("uebsb91kcdc.it.att.com:3904");
- servers.add("uebsb91kcdc.it.att.com:3904");
- servers.add("uebsb91kcdc.it.att.com:3904");
-
- YamlToObjectConverter converter = new YamlToObjectConverter();
- DistributionEngineConfiguration distributionEngineConfiguration = converter.convert("src/test/resources/config/catalog-be/distribEngine1/distribution-engine-configuration.yaml", DistributionEngineConfiguration.class);
-
- DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(2l, distributionEngineConfiguration, "PROD", new AtomicBoolean(false), null, null);
- distributionEngineInitTask.startTask();
-
- }
-
- @javax.annotation.Resource
- private ComponentsUtils componentUtils;
-
- @javax.annotation.Resource
- private DistributionNotificationSender distributionNotificationSender;
-
- @javax.annotation.Resource
- private ServiceDistributionArtifactsBuilder serviceDistributionArtifactsBuilder;
-
- @javax.annotation.Resource
- private DistributionEngineClusterHealth distributionEngineClusterHealth;
-
- private static Logger logger = LoggerFactory.getLogger(DistributionEngine.class.getName());
-
- private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<String, DistributionEngineInitTask>();
- private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<String, DistributionEnginePollingTask>();
-
- private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<String, AtomicBoolean>();
-
- @Override
- public boolean isActive() {
-
- if (true == envNamePerInitTask.isEmpty()) {
- return false;
- }
-
- for (DistributionEngineInitTask task : envNamePerInitTask.values()) {
- boolean active = task.isActive();
- if (active == false) {
- return false;
- }
- }
- return true;
- }
-
- @PostConstruct
- private void init() {
-
- logger.trace("Enter init method of DistributionEngine");
-
- DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
-
- boolean startDistributionEngine = distributionEngineConfiguration.isStartDistributionEngine();
- logger.debug("Distribution engine activation parameter is {}", startDistributionEngine);
- if (false == startDistributionEngine) {
- logger.info("The disribution engine is disabled");
-
- this.distributionEngineClusterHealth.setHealthCheckUebIsDisabled();
-
- return;
- }
-
- boolean isValidConfig = validateConfiguration(distributionEngineConfiguration);
-
- if (false == isValidConfig) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DistributionEngineInitTask.INIT_DISTRIBUTION_ENGINE_FLOW, "validate distribution configuration in init phase");
- BeEcompErrorManager.getInstance().logBeUebSystemError(DistributionEngineInitTask.INIT_DISTRIBUTION_ENGINE_FLOW, "validate distribution configuration in init phase");
-
- this.distributionEngineClusterHealth.setHealthCheckUebConfigurationError();
- return;
- }
-
- List<String> environments = distributionEngineConfiguration.getEnvironments();
-
- for (String envName : environments) {
-
- DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask(distributionEngineConfiguration, envName, componentUtils, distributionEngineClusterHealth);
-
- logger.debug("Init task for environment {}", envName);
-
- AtomicBoolean status = new AtomicBoolean(false);
- envNamePerStatus.put(envName, status);
- DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l, distributionEngineConfiguration, envName, status, componentUtils, distributionEnginePollingTask);
- distributionEngineInitTask.startTask();
- envNamePerInitTask.put(envName, distributionEngineInitTask);
- envNamePerPollingTask.put(envName, distributionEnginePollingTask);
- }
-
- logger.debug("Init UEB health check");
- distributionEngineClusterHealth.startHealthCheckTask(envNamePerStatus);
-
- logger.trace("Exit init method of DistributionEngine");
-
- }
-
- @PreDestroy
- public void shutdown() {
- logger.info("distribution engine shutdown - start");
- if (envNamePerInitTask != null) {
- for (DistributionEngineInitTask task : envNamePerInitTask.values()) {
- task.destroy();
- }
- }
- if (envNamePerPollingTask != null) {
- for (DistributionEnginePollingTask task : envNamePerPollingTask.values()) {
- task.destroy();
- }
- }
-
- }
-
- /**
- * validate mandatory configuration parameters received
- *
- * @param deConfiguration
- * @return
- */
- protected boolean validateConfiguration(DistributionEngineConfiguration deConfiguration) {
-
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- boolean result = true;
- result = isValidServers(deConfiguration.getUebServers(), methodName, "uebServers") && result;
- result = isValidParam(deConfiguration.getEnvironments(), methodName, "environments") && result;
- result = isValidParam(deConfiguration.getUebPublicKey(), methodName, "uebPublicKey") && result;
- result = isValidParam(deConfiguration.getUebSecretKey(), methodName, "uebSecretKey") && result;
- result = isValidParam(deConfiguration.getDistributionNotifTopicName(), methodName, "distributionNotifTopicName") && result;
- result = isValidParam(deConfiguration.getDistributionStatusTopicName(), methodName, "distributionStatusTopicName") && result;
- result = isValidObject(deConfiguration.getCreateTopic(), methodName, "createTopic") && result;
- result = isValidObject(deConfiguration.getDistributionStatusTopic(), methodName, "distributionStatusTopic") && result;
- result = isValidObject(deConfiguration.getInitMaxIntervalSec(), methodName, "initMaxIntervalSec") && result;
- result = isValidObject(deConfiguration.getInitRetryIntervalSec(), methodName, "initRetryIntervalSec") && result;
- result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerId(), methodName, "consumerId") && result;
- result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerGroup(), methodName, "consumerGroup") && result;
- result = isValidObject(deConfiguration.getDistributionStatusTopic().getFetchTimeSec(), methodName, "fetchTimeSec") && result;
- result = isValidObject(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec(), methodName, "pollingIntervalSec") && result;
-
- return result;
- }
-
- private boolean isValidServers(List<String> uebServers, String methodName, String paramName) {
-
- if (uebServers == null || uebServers.size() == 0) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeMissingConfigurationError, methodName, paramName);
- BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
- return false;
- }
-
- if (uebServers.size() < 2) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeConfigurationInvalidListSizeError, methodName, paramName, "2");
- BeEcompErrorManager.getInstance().logBeConfigurationInvalidListSizeError(methodName, paramName, 2);
- return false;
- }
-
- for (String serverFqdn : uebServers) {
- if (false == isValidFqdn(serverFqdn)) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeInvalidConfigurationError, methodName, paramName, serverFqdn);
- BeEcompErrorManager.getInstance().logBeInvalidConfigurationError(methodName, paramName, serverFqdn);
- return false;
- }
- }
-
- return true;
- }
-
- private boolean isValidFqdn(String serverFqdn) {
-
- try {
- Matcher matcher = FQDN_PATTERN.matcher(serverFqdn);
- return matcher.matches();
-
- } catch (Exception e) {
- logger.debug("Failed to match value of address {}", serverFqdn, e);
- return false;
- }
-
- }
-
- private boolean isEmptyParam(String param) {
-
- if (param == null || true == param.isEmpty()) {
- return true;
- }
-
- return false;
- }
-
- private boolean isValidParam(String paramValue, String methodName, String paramName) {
-
- if (isEmptyParam(paramValue)) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeMissingConfigurationError, methodName, paramName);
- BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
- return false;
- }
- return true;
-
- }
-
- private boolean isValidParam(List<String> paramValue, String methodName, String paramName) {
-
- if (isEmptyList(paramValue)) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeMissingConfigurationError, methodName, paramName);
- BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
- return false;
- }
- return true;
-
- }
-
- private boolean isValidObject(Object paramValue, String methodName, String paramName) {
-
- if (paramValue == null) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeMissingConfigurationError, methodName, paramName);
- BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
- return false;
- }
- return true;
-
- }
-
- private boolean isEmptyList(List<String> list) {
- if (list == null || true == list.isEmpty()) {
- return true;
- }
- return false;
- }
-
- private String getEnvironmentErrorDescription(StorageOperationStatus status) {
-
- switch (status) {
- case DISTR_ENVIRONMENT_NOT_AVAILABLE:
- return "environment is unavailable";
- case DISTR_ENVIRONMENT_NOT_FOUND:
- return "environment is not configured in our system";
- case DISTR_ENVIRONMENT_SENT_IS_INVALID:
- return "environment name is invalid";
-
- default:
- return "unkhown";
-
- }
- }
-
- public StorageOperationStatus isEnvironmentAvailable(String envName) {
-
- if (envName == null || true == envName.isEmpty()) {
-
- return StorageOperationStatus.DISTR_ENVIRONMENT_SENT_IS_INVALID;
- }
-
- AtomicBoolean status = envNamePerStatus.get(envName);
- if (status == null) {
- return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_FOUND;
- }
-
- if (false == status.get()) {
- return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_AVAILABLE;
- }
- return StorageOperationStatus.OK;
- }
+ private static final Logger LOGGER = LoggerFactory.getLogger(DistributionEngine.class);
+ private static final Pattern FQDN_PATTERN = Pattern.compile("^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9])(\\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9]))*(:[0-9]{2,4})*$", Pattern.CASE_INSENSITIVE);
- public StorageOperationStatus isEnvironmentAvailable() {
+ @Autowired
+ private EnvironmentsEngine environmentsEngine;
- String envName = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getEnvironments().get(0);
+ @Resource
+ private DistributionNotificationSender distributionNotificationSender;
- return isEnvironmentAvailable(envName);
- }
+ @Resource
+ private ServiceDistributionArtifactsBuilder serviceDistributionArtifactsBuilder;
- @Override
- public void disableEnvironment(String envName) {
- // TODO disable tasks
- AtomicBoolean status = envNamePerStatus.get(envName);
- status.set(false);
- }
+ @Resource
+ private DistributionEngineClusterHealth distributionEngineClusterHealth;
- @Override
- public StorageOperationStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envName, String userId, String modifierName) {
+ @Resource
+ private ServiceDistributionValidation serviceDistributionValidation;
- logger.debug("Received notify service request. distributionId = {}, serviceUuid = {} serviceUid = {}, envName = {}, userId = {}, modifierName {}", distributionId, service.getUUID(), service.getUniqueId(), envName, userId, modifierName);
+ private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
+ private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
+ private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
- DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+ /**
+ * The main method for testing only
+ * @param args
+ */
+ public static void main(String[] args) {
- String distributionNotifTopicName = deConfiguration.getDistributionNotifTopicName();
- String topicName = DistributionEngineInitTask.buildTopicName(distributionNotifTopicName, envName);
+ List<String> servers = new ArrayList<>();
+ String server = "uebsb91kcdc.it.att.com:3904";
+ servers.add(server);
+ servers.add(server);
+ servers.add(server);
- StorageOperationStatus sendNotification = distributionNotificationSender.sendNotification(topicName, distributionId, deConfiguration, envName, notificationData, service, userId, modifierName);
+ YamlToObjectConverter converter = new YamlToObjectConverter();
+ DistributionEngineConfiguration distributionEngineConfiguration = converter.convert("src/test/resources/config/catalog-be/distribEngine1/distribution-engine-configuration.yaml", DistributionEngineConfiguration.class);
- logger.debug("Finish notifyService. status is {}", sendNotification);
+ DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(2l, distributionEngineConfiguration, "PROD", new AtomicBoolean(false), null, null, null);
+ distributionEngineInitTask.startTask();
+
+ }
- return sendNotification;
- }
+ @Override
+ public boolean isActive() {
- @Override
- public Either<INotificationData, StorageOperationStatus> isReadyForDistribution(Service service, String distributionId, String envName) {
- StorageOperationStatus status = isEnvironmentAvailable(envName);
- if (status != StorageOperationStatus.OK) {
- String envErrorDec = getEnvironmentErrorDescription(status);
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, DistributionNotificationSender.DISTRIBUTION_NOTIFICATION_SENDING,
- "Environment name " + envName + " is not available. Reason : " + envErrorDec);
- BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(DistributionNotificationSender.DISTRIBUTION_NOTIFICATION_SENDING, "Environment name " + envName + " is not available. Reason : " + envErrorDec);
- return Either.right(status);
- }
+ if (envNamePerInitTask.isEmpty()) {
+ return false;
+ }
- Either<Boolean, StorageOperationStatus> isServiceContainsDeploymentArtifactsStatus = serviceDistributionArtifactsBuilder.isServiceContainsDeploymentArtifacts(service);
- if (isServiceContainsDeploymentArtifactsStatus.isRight()) {
- StorageOperationStatus operationStatus = isServiceContainsDeploymentArtifactsStatus.right().value();
- return Either.right(operationStatus);
- } else {
- Boolean isDeploymentArtifactExists = isServiceContainsDeploymentArtifactsStatus.left().value();
- if (isDeploymentArtifactExists == null || isDeploymentArtifactExists.booleanValue() == false) {
- return Either.right(StorageOperationStatus.DISTR_ARTIFACT_NOT_FOUND);
- }
- }
+ for (DistributionEngineInitTask task : envNamePerInitTask.values()) {
+ boolean active = task.isActive();
+ if (!active) {
+ return false;
+ }
+ }
+ return true;
+ }
- INotificationData value = serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(service, distributionId);
- value = serviceDistributionArtifactsBuilder.buildServiceForDistribution(value, service);
+ @PostConstruct
+ private void init() {
+
+ LOGGER.trace("Enter init method of DistributionEngine");
+
+ DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+
+ boolean startDistributionEngine = distributionEngineConfiguration.isStartDistributionEngine();
+ LOGGER.debug("Distribution engine activation parameter is {}", startDistributionEngine);
+ if (!startDistributionEngine) {
+ LOGGER.info("The disribution engine is disabled");
- return Either.left(value);
- }
+ this.distributionEngineClusterHealth.setHealthCheckUebIsDisabled();
+
+ return;
+ }
+
+ boolean isValidConfig = validateConfiguration(distributionEngineConfiguration);
+
+ if (!isValidConfig) {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(DistributionEngineInitTask.INIT_DISTRIBUTION_ENGINE_FLOW, "validate distribution configuration in init phase");
+
+ this.distributionEngineClusterHealth.setHealthCheckUebConfigurationError();
+ return;
+ }
+
+ List<String> environments = distributionEngineConfiguration.getEnvironments();
+
+ for (String envName : environments) {
+ LOGGER.debug("init task for environment {}", envName);
+ AtomicBoolean status = new AtomicBoolean(false);
+ envNamePerStatus.put(envName, status);
+ environmentsEngine.connectUebTopicForDistributionConfTopic(envName, status, envNamePerInitTask, envNamePerPollingTask);
+ }
+
+ LOGGER.debug("init UEB health check");
+ distributionEngineClusterHealth.startHealthCheckTask(envNamePerStatus);
+
+ LOGGER.trace("Exit init method of DistributionEngine");
+
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ LOGGER.info("distribution engine shutdown - start");
+ if (envNamePerInitTask != null) {
+ for (DistributionEngineInitTask task : envNamePerInitTask.values()) {
+ task.destroy();
+ }
+ }
+ if (envNamePerPollingTask != null) {
+ for (DistributionEnginePollingTask task : envNamePerPollingTask.values()) {
+ task.destroy();
+ }
+ }
+
+ }
+
+ /**
+ * validate mandatory configuration parameters received
+ *
+ * @param deConfiguration
+ * @return
+ */
+ protected boolean validateConfiguration(DistributionEngineConfiguration deConfiguration) {
+
+ String methodName = "validateConfiguration";
+
+ boolean result = true;
+ result = isValidServers(deConfiguration.getUebServers(), methodName, "uebServers") && result;
+ result = isValidParam(deConfiguration.getEnvironments(), methodName, "environments") && result;
+ result = isValidParam(deConfiguration.getUebPublicKey(), methodName, "uebPublicKey") && result;
+ result = isValidParam(deConfiguration.getUebSecretKey(), methodName, "uebSecretKey") && result;
+ result = isValidParam(deConfiguration.getDistributionNotifTopicName(), methodName, "distributionNotifTopicName") && result;
+ result = isValidParam(deConfiguration.getDistributionStatusTopicName(), methodName, "distributionStatusTopicName") && result;
+ result = isValidObject(deConfiguration.getCreateTopic(), methodName, "createTopic") && result;
+ result = isValidObject(deConfiguration.getDistributionStatusTopic(), methodName, "distributionStatusTopic") && result;
+ result = isValidObject(deConfiguration.getInitMaxIntervalSec(), methodName, "initMaxIntervalSec") && result;
+ result = isValidObject(deConfiguration.getInitRetryIntervalSec(), methodName, "initRetryIntervalSec") && result;
+ result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerId(), methodName, "consumerId") && result;
+ result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerGroup(), methodName, "consumerGroup") && result;
+ result = isValidObject(deConfiguration.getDistributionStatusTopic().getFetchTimeSec(), methodName, "fetchTimeSec") && result;
+ result = isValidObject(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec(), methodName, "pollingIntervalSec") && result;
+
+ return result;
+ }
+
+ private boolean isValidServers(List<String> uebServers, String methodName, String paramName) {
+
+ if (uebServers == null || uebServers.isEmpty()) {
+ BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
+ return false;
+ }
+
+ if (uebServers.size() < 2) {
+ BeEcompErrorManager.getInstance().logBeConfigurationInvalidListSizeError(methodName, paramName, 2);
+ return false;
+ }
+
+ for (String serverFqdn : uebServers) {
+ if (!isValidFqdn(serverFqdn)) {
+ BeEcompErrorManager.getInstance().logBeInvalidConfigurationError(methodName, paramName, serverFqdn);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private boolean isValidFqdn(String serverFqdn) {
+
+ try {
+ Matcher matcher = FQDN_PATTERN.matcher(serverFqdn);
+ return matcher.matches();
+
+ } catch (Exception e) {
+ LOGGER.debug("Failed to match value of address {}", serverFqdn, e);
+ return false;
+ }
+ }
+
+ private boolean isValidParam(String paramValue, String methodName, String paramName) {
+
+ if (StringUtils.isEmpty(paramValue)) {
+ BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean isValidParam(List<String> paramValue, String methodName, String paramName) {
+
+ if (CollectionUtils.isEmpty(paramValue)) {
+ BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean isValidObject(Object paramValue, String methodName, String paramName) {
+
+ if (paramValue == null) {
+ BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
+ return false;
+ }
+ return true;
+
+ }
+
+ private String getEnvironmentErrorDescription(StorageOperationStatus status) {
+
+ switch (status) {
+ case DISTR_ENVIRONMENT_NOT_AVAILABLE:
+ return "environment is unavailable";
+ case DISTR_ENVIRONMENT_NOT_FOUND:
+ return "environment is not configured in our system";
+ case DISTR_ENVIRONMENT_SENT_IS_INVALID:
+ return "environment name is invalid";
+ default:
+ return "unkhown";
+ }
+ }
+
+ @Override
+ public StorageOperationStatus isEnvironmentAvailable(String envName) {
+
+ if (envName == null || envName.isEmpty()) {
+
+ return StorageOperationStatus.DISTR_ENVIRONMENT_SENT_IS_INVALID;
+ }
+
+ AtomicBoolean status = envNamePerStatus.get(envName);
+ if (status == null) {
+ return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_FOUND;
+ }
+
+ if (!status.get()) {
+ return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_AVAILABLE;
+ }
+ return StorageOperationStatus.OK;
+ }
+
+ @Override
+ public StorageOperationStatus isEnvironmentAvailable() {
+
+ String envName = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getEnvironments().get(0);
+
+ return isEnvironmentAvailable(envName);
+ }
+
+ @Override
+ public void disableEnvironment(String envName) {
+ AtomicBoolean status = envNamePerStatus.get(envName);
+ status.set(false);
+ }
+
+ @Override
+ public ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envName, String userId, String modifierName) {
+ return notifyService(distributionId, service, notificationData, envName, envName, userId, modifierName);
+ }
+
+ @Override
+ public ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envId, String envName, String userId, String modifierName) {
+ LOGGER.debug("Received notify service request. distributionId = {}, serviceUuid = {} serviceUid = {}, envName = {}, userId = {}, modifierName {}", distributionId, service.getUUID(), service.getUniqueId(), envName, userId, modifierName);
+ String topicName = buildTopicName(envName);
+ ActionStatus notifyServiceStatus = Optional.ofNullable(environmentsEngine.getEnvironmentById(envId))
+ .map(EnvironmentMessageBusData::new)
+ .map(messageBusData -> distributionNotificationSender.sendNotification(topicName, distributionId, messageBusData, notificationData, service, userId, modifierName))
+ .orElse(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE);
+ LOGGER.debug("Finish notifyService. status is {}", notifyServiceStatus);
+ return notifyServiceStatus;
+ }
+
+ private String buildTopicName(String envName) {
+ DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+ String distributionNotifTopicName = deConfiguration.getDistributionNotifTopicName();
+ return DistributionEngineInitTask.buildTopicName(distributionNotifTopicName, envName);
+ }
+
+ @Override
+ public StorageOperationStatus isReadyForDistribution(Service service, String envName) {
+ StorageOperationStatus status = isEnvironmentAvailable(envName);
+ if (status != StorageOperationStatus.OK) {
+ String envErrorDec = getEnvironmentErrorDescription(status);
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(DistributionNotificationSender.DISTRIBUTION_NOTIFICATION_SENDING, "Environment name " + envName + " is not available. Reason : " + envErrorDec);
+ return status;
+ }
+
+ return verifyServiceHasDeploymentArtifacts(service);
+ }
+
+ @Override
+ public StorageOperationStatus verifyServiceHasDeploymentArtifacts(Service service) {
+ if (!serviceDistributionArtifactsBuilder.verifyServiceContainsDeploymentArtifacts(service)) {
+ return StorageOperationStatus.DISTR_ARTIFACT_NOT_FOUND;
+ }
+ return StorageOperationStatus.OK;
+ }
+
+ @Override
+ public OperationalEnvironmentEntry getEnvironmentById(String opEnvId) {
+ return environmentsEngine.getEnvironmentById(opEnvId);
+ }
+
+ @Override
+ public INotificationData buildServiceForDistribution(Service service, String distributionId, String workloadContext) {
+ INotificationData value = serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(service, distributionId, workloadContext);
+ value = serviceDistributionArtifactsBuilder.buildServiceForDistribution(value, service);
+ return value;
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineClusterHealth.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineClusterHealth.java
index 85a868f156..9599879006 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineClusterHealth.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineClusterHealth.java
@@ -20,22 +20,6 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.ConfigurationManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
@@ -47,311 +31,318 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
@Component("distribution-engine-cluster-health")
public class DistributionEngineClusterHealth {
- protected static String UEB_HEALTH_LOG_CONTEXT = "ueb.healthcheck";
+ protected static String UEB_HEALTH_LOG_CONTEXT = "ueb.healthcheck";
- private static Logger healthLogger = LoggerFactory.getLogger(UEB_HEALTH_LOG_CONTEXT);
+ private static final Logger healthLogger = LoggerFactory.getLogger(UEB_HEALTH_LOG_CONTEXT);
- private static final String UEB_HEALTH_CHECK_STR = "uebHealthCheck";
+ private static final String UEB_HEALTH_CHECK_STR = "uebHealthCheck";
- boolean lastHealthState = false;
+ boolean lastHealthState = false;
- Object lockOject = new Object();
+ Object lockOject = new Object();
- private long reconnectInterval = 5;
+ private long reconnectInterval = 5;
- private long healthCheckReadTimeout = 20;
+ private long healthCheckReadTimeout = 20;
- private static Logger logger = LoggerFactory.getLogger(DistributionEngineClusterHealth.class.getName());
+ private static final Logger logger = LoggerFactory.getLogger(DistributionEngineClusterHealth.class);
- private List<String> uebServers = null;
+ private List<String> uebServers = null;
- private String publicApiKey = null;
+ private String publicApiKey = null;
- public enum HealthCheckInfoResult {
+ public enum HealthCheckInfoResult {
- OK(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.UP, null, ClusterStatusDescription.OK.getDescription())),
- UNAVAILABLE(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.UNAVAILABLE.getDescription())),
- NOT_CONFIGURED(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.NOT_CONFIGURED.getDescription())),
- DISABLED(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.DISABLED.getDescription()));
+ OK(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.UP, null, ClusterStatusDescription.OK.getDescription())),
+ UNAVAILABLE(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.UNAVAILABLE.getDescription())),
+ NOT_CONFIGURED(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.NOT_CONFIGURED.getDescription())),
+ DISABLED(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.DISABLED.getDescription()));
- private HealthCheckInfo healthCheckInfo;
+ private HealthCheckInfo healthCheckInfo;
- HealthCheckInfoResult(HealthCheckInfo healthCheckInfo) {
- this.healthCheckInfo = healthCheckInfo;
- }
+ HealthCheckInfoResult(HealthCheckInfo healthCheckInfo) {
+ this.healthCheckInfo = healthCheckInfo;
+ }
- public HealthCheckInfo getHealthCheckInfo() {
- return healthCheckInfo;
- }
+ public HealthCheckInfo getHealthCheckInfo() {
+ return healthCheckInfo;
+ }
- }
+ }
- private HealthCheckInfo healthCheckInfo = HealthCheckInfoResult.UNAVAILABLE.getHealthCheckInfo();
+ private HealthCheckInfo healthCheckInfo = HealthCheckInfoResult.UNAVAILABLE.getHealthCheckInfo();
- private Map<String, AtomicBoolean> envNamePerStatus = null;
+ private Map<String, AtomicBoolean> envNamePerStatus = null;
- private ScheduledFuture<?> scheduledFuture = null;
+ private ScheduledFuture<?> scheduledFuture = null;
- ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "UEB-Health-Check-Task");
- }
- });
+ ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "UEB-Health-Check-Task");
+ }
+ });
- HealthCheckScheduledTask healthCheckScheduledTask = null;
+ HealthCheckScheduledTask healthCheckScheduledTask = null;
- public enum ClusterStatusDescription {
+ public enum ClusterStatusDescription {
- OK("OK"), UNAVAILABLE("U-EB cluster is not available"), NOT_CONFIGURED("U-EB cluster is not configured"), DISABLED("DE is disabled in configuration");
+ OK("OK"), UNAVAILABLE("U-EB cluster is not available"), NOT_CONFIGURED("U-EB cluster is not configured"), DISABLED("DE is disabled in configuration");
- private String desc;
+ private String desc;
- ClusterStatusDescription(String desc) {
- this.desc = desc;
- }
+ ClusterStatusDescription(String desc) {
+ this.desc = desc;
+ }
- public String getDescription() {
- return desc;
- }
+ public String getDescription() {
+ return desc;
+ }
- }
+ }
- /**
- * Health Check Task Scheduler.
- *
- * It schedules a task which send a apiKey get query towards the UEB servers. In case a query to the first UEB server is failed, then a second query is sent to the next UEB server.
- *
- *
- * @author esofer
- *
- */
- public class HealthCheckScheduledTask implements Runnable {
+ /**
+ * Health Check Task Scheduler.
+ *
+ * It schedules a task which send a apiKey get query towards the UEB servers. In case a query to the first UEB server is failed, then a second query is sent to the next UEB server.
+ *
+ *
+ * @author esofer
+ *
+ */
+ public class HealthCheckScheduledTask implements Runnable {
- List<UebHealthCheckCall> healthCheckCalls = new ArrayList<>();
+ List<UebHealthCheckCall> healthCheckCalls = new ArrayList<>();
- public HealthCheckScheduledTask(List<String> uebServers) {
+ public HealthCheckScheduledTask(List<String> uebServers) {
- logger.debug("Create health check calls for servers {}", uebServers);
- if (uebServers != null) {
- for (String server : uebServers) {
- healthCheckCalls.add(new UebHealthCheckCall(server, publicApiKey));
- }
- }
- }
+ logger.debug("Create health check calls for servers {}", uebServers);
+ if (uebServers != null) {
+ for (String server : uebServers) {
+ healthCheckCalls.add(new UebHealthCheckCall(server, publicApiKey));
+ }
+ }
+ }
- @Override
- public void run() {
+ @Override
+ public void run() {
- healthLogger.trace("Executing UEB Health Check Task - Start");
+ healthLogger.trace("Executing UEB Health Check Task - Start");
- boolean healthStatus = verifyAtLeastOneEnvIsUp();
+ boolean healthStatus = verifyAtLeastOneEnvIsUp();
- if (true == healthStatus) {
- boolean queryUebStatus = queryUeb();
- if (queryUebStatus == lastHealthState) {
- return;
- }
+ if (true == healthStatus) {
+ boolean queryUebStatus = queryUeb();
+ if (queryUebStatus == lastHealthState) {
+ return;
+ }
- synchronized (lockOject) {
- if (queryUebStatus != lastHealthState) {
- logger.trace("UEB Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
- lastHealthState = queryUebStatus;
- logAlarm(lastHealthState);
- if (true == queryUebStatus) {
- healthCheckInfo = HealthCheckInfoResult.OK.getHealthCheckInfo();
- } else {
- healthCheckInfo = HealthCheckInfoResult.UNAVAILABLE.getHealthCheckInfo();
- }
- }
- }
- } else {
- healthLogger.trace("Not all UEB Environments are up");
- }
-
- }
-
- /**
- * verify that at least one environment is up.
- *
- */
- private boolean verifyAtLeastOneEnvIsUp() {
-
- boolean healthStatus = false;
+ synchronized (lockOject) {
+ if (queryUebStatus != lastHealthState) {
+ logger.trace("UEB Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
+ lastHealthState = queryUebStatus;
+ logAlarm(lastHealthState);
+ if (true == queryUebStatus) {
+ healthCheckInfo = HealthCheckInfoResult.OK.getHealthCheckInfo();
+ } else {
+ healthCheckInfo = HealthCheckInfoResult.UNAVAILABLE.getHealthCheckInfo();
+ }
+ }
+ }
+ } else {
+ healthLogger.trace("Not all UEB Environments are up");
+ }
+
+ }
+
+ /**
+ * verify that at least one environment is up.
+ *
+ */
+ private boolean verifyAtLeastOneEnvIsUp() {
+
+ boolean healthStatus = false;
- if (envNamePerStatus != null) {
- Collection<AtomicBoolean> values = envNamePerStatus.values();
- if (values != null) {
- for (AtomicBoolean status : values) {
- if (true == status.get()) {
- healthStatus = true;
- break;
- }
- }
- }
- }
-
- return healthStatus;
- }
-
- /**
- * executor for the query itself
- */
- ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "UEB-Health-Check-Thread");
- }
- });
-
- /**
- * go all UEB servers and send a get apiKeys query. In case a query is succeed, no query is sent to the rest of UEB servers.
- *
- *
- * @return
- */
- private boolean queryUeb() {
-
- Boolean result = false;
- int retryNumber = 1;
- for (UebHealthCheckCall healthCheckCall : healthCheckCalls) {
- try {
-
- healthLogger.debug("Before running Health Check retry query number {} towards UEB server {}", retryNumber, healthCheckCall.getServer());
-
- Future<Boolean> future = healthCheckExecutor.submit(healthCheckCall);
- result = future.get(healthCheckReadTimeout, TimeUnit.SECONDS);
-
- healthLogger.debug("After running Health Check retry query number {} towards UEB server {}. Result is {}", retryNumber, healthCheckCall.getServer(), result);
-
- if (result != null && true == result.booleanValue()) {
- break;
- }
-
- } catch (Exception e) {
- String message = e.getMessage();
- if (message == null) {
- message = e.getClass().getName();
- }
- healthLogger.debug("Error occured during running Health Check retry query towards UEB server {}. Result is {}", healthCheckCall.getServer(), message);
- healthLogger.trace("Error occured during running Health Check retry query towards UEB server {}. Result is {}", healthCheckCall.getServer(), message, e);
- }
- retryNumber++;
-
- }
-
- return result;
-
- }
-
- public List<UebHealthCheckCall> getHealthCheckCalls() {
- return healthCheckCalls;
- }
-
- }
-
- @PostConstruct
- private void init() {
-
- logger.trace("Enter init method of DistributionEngineClusterHealth");
-
- Long reconnectIntervalConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getUebHealthCheckReconnectIntervalInSeconds();
- if (reconnectIntervalConfig != null) {
- reconnectInterval = reconnectIntervalConfig.longValue();
- }
- Long healthCheckReadTimeoutConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getUebHealthCheckReadTimeout();
- if (healthCheckReadTimeoutConfig != null) {
- healthCheckReadTimeout = healthCheckReadTimeoutConfig.longValue();
- }
-
- DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
-
- this.uebServers = distributionEngineConfiguration.getUebServers();
- this.publicApiKey = distributionEngineConfiguration.getUebPublicKey();
-
- this.healthCheckScheduledTask = new HealthCheckScheduledTask(this.uebServers);
-
- logger.trace("Exit init method of DistributionEngineClusterHealth");
-
- }
-
- @PreDestroy
- private void destroy() {
-
- if (scheduledFuture != null) {
- scheduledFuture.cancel(true);
- scheduledFuture = null;
- }
-
- if (healthCheckScheduler != null) {
- healthCheckScheduler.shutdown();
- }
-
- }
-
- /**
- * Start health check task.
- *
- * @param envNamePerStatus
- * @param startTask
- */
- public void startHealthCheckTask(Map<String, AtomicBoolean> envNamePerStatus, boolean startTask) {
- this.envNamePerStatus = envNamePerStatus;
-
- if (startTask == true && this.scheduledFuture == null) {
- this.scheduledFuture = this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
- }
- }
-
- public void startHealthCheckTask(Map<String, AtomicBoolean> envNamePerStatus) {
- startHealthCheckTask(envNamePerStatus, true);
- }
-
- private void logAlarm(boolean lastHealthState) {
- if (lastHealthState == true) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeHealthCheckRecovery, UEB_HEALTH_CHECK_STR);
- BeEcompErrorManager.getInstance().logBeHealthCheckUebClusterRecovery(UEB_HEALTH_CHECK_STR);
- } else {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeHealthCheckError, UEB_HEALTH_CHECK_STR);
- BeEcompErrorManager.getInstance().logBeHealthCheckUebClusterError(UEB_HEALTH_CHECK_STR);
- }
- }
-
- public HealthCheckInfo getHealthCheckInfo() {
- return healthCheckInfo;
- }
-
- /**
- * change the health check to DISABLE
- */
- public void setHealthCheckUebIsDisabled() {
- healthCheckInfo = HealthCheckInfoResult.DISABLED.getHealthCheckInfo();
- }
-
- /**
- * change the health check to NOT CONFGIURED
- */
- public void setHealthCheckUebConfigurationError() {
- healthCheckInfo = HealthCheckInfoResult.NOT_CONFIGURED.getHealthCheckInfo();
- }
-
- public void setHealthCheckOkAndReportInCaseLastStateIsDown() {
-
- if (lastHealthState == true) {
- return;
- }
- synchronized (lockOject) {
- if (lastHealthState == false) {
- logger.debug("Going to update health check state to available");
- lastHealthState = true;
- healthCheckInfo = HealthCheckInfoResult.OK.getHealthCheckInfo();
- logAlarm(lastHealthState);
- }
- }
-
- }
+ if (envNamePerStatus != null) {
+ Collection<AtomicBoolean> values = envNamePerStatus.values();
+ if (values != null) {
+ for (AtomicBoolean status : values) {
+ if (true == status.get()) {
+ healthStatus = true;
+ break;
+ }
+ }
+ }
+ }
+
+ return healthStatus;
+ }
+
+ /**
+ * executor for the query itself
+ */
+ ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "UEB-Health-Check-Thread");
+ }
+ });
+
+ /**
+ * go all UEB servers and send a get apiKeys query. In case a query is succeed, no query is sent to the rest of UEB servers.
+ *
+ *
+ * @return
+ */
+ private boolean queryUeb() {
+
+ Boolean result = false;
+ int retryNumber = 1;
+ for (UebHealthCheckCall healthCheckCall : healthCheckCalls) {
+ try {
+
+ healthLogger.debug("Before running Health Check retry query number {} towards UEB server {}", retryNumber, healthCheckCall.getServer());
+
+ Future<Boolean> future = healthCheckExecutor.submit(healthCheckCall);
+ result = future.get(healthCheckReadTimeout, TimeUnit.SECONDS);
+
+ healthLogger.debug("After running Health Check retry query number {} towards UEB server {}. Result is {}", retryNumber, healthCheckCall.getServer(), result);
+
+ if (result != null && true == result.booleanValue()) {
+ break;
+ }
+
+ } catch (Exception e) {
+ String message = e.getMessage();
+ if (message == null) {
+ message = e.getClass().getName();
+ }
+ healthLogger.debug("Error occured during running Health Check retry query towards UEB server {}. Result is {}", healthCheckCall.getServer(), message);
+ healthLogger.trace("Error occured during running Health Check retry query towards UEB server {}. Result is {}", healthCheckCall.getServer(), message, e);
+ }
+ retryNumber++;
+
+ }
+
+ return result;
+
+ }
+
+ public List<UebHealthCheckCall> getHealthCheckCalls() {
+ return healthCheckCalls;
+ }
+
+ }
+
+ @PostConstruct
+ protected void init() {
+
+ logger.trace("Enter init method of DistributionEngineClusterHealth");
+
+ Long reconnectIntervalConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getUebHealthCheckReconnectIntervalInSeconds();
+ if (reconnectIntervalConfig != null) {
+ reconnectInterval = reconnectIntervalConfig.longValue();
+ }
+ Long healthCheckReadTimeoutConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getUebHealthCheckReadTimeout();
+ if (healthCheckReadTimeoutConfig != null) {
+ healthCheckReadTimeout = healthCheckReadTimeoutConfig.longValue();
+ }
+
+ DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+
+ this.uebServers = distributionEngineConfiguration.getUebServers();
+ this.publicApiKey = distributionEngineConfiguration.getUebPublicKey();
+
+ this.healthCheckScheduledTask = new HealthCheckScheduledTask(this.uebServers);
+
+ logger.trace("Exit init method of DistributionEngineClusterHealth");
+
+ }
+
+ @PreDestroy
+ protected void destroy() {
+
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ scheduledFuture = null;
+ }
+
+ if (healthCheckScheduler != null) {
+ healthCheckScheduler.shutdown();
+ }
+
+ }
+
+ /**
+ * Start health check task.
+ *
+ * @param envNamePerStatus
+ * @param startTask
+ */
+ public void startHealthCheckTask(Map<String, AtomicBoolean> envNamePerStatus, boolean startTask) {
+ this.envNamePerStatus = envNamePerStatus;
+
+ if (startTask == true && this.scheduledFuture == null) {
+ this.scheduledFuture = this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
+ }
+ }
+
+ public void startHealthCheckTask(Map<String, AtomicBoolean> envNamePerStatus) {
+ startHealthCheckTask(envNamePerStatus, true);
+ }
+
+ private void logAlarm(boolean lastHealthState) {
+ if (lastHealthState == true) {
+ BeEcompErrorManager.getInstance().logBeHealthCheckUebClusterRecovery(UEB_HEALTH_CHECK_STR);
+ } else {
+ BeEcompErrorManager.getInstance().logBeHealthCheckUebClusterError(UEB_HEALTH_CHECK_STR);
+ }
+ }
+
+ public HealthCheckInfo getHealthCheckInfo() {
+ return healthCheckInfo;
+ }
+
+ /**
+ * change the health check to DISABLE
+ */
+ public void setHealthCheckUebIsDisabled() {
+ healthCheckInfo = HealthCheckInfoResult.DISABLED.getHealthCheckInfo();
+ }
+
+ /**
+ * change the health check to NOT CONFGIURED
+ */
+ public void setHealthCheckUebConfigurationError() {
+ healthCheckInfo = HealthCheckInfoResult.NOT_CONFIGURED.getHealthCheckInfo();
+ }
+
+ public void setHealthCheckOkAndReportInCaseLastStateIsDown() {
+
+ if (lastHealthState == true) {
+ return;
+ }
+ synchronized (lockOject) {
+ if (lastHealthState == false) {
+ logger.debug("Going to update health check state to available");
+ lastHealthState = true;
+ healthCheckInfo = HealthCheckInfoResult.OK.getHealthCheckInfo();
+ logAlarm(lastHealthState);
+ }
+ }
+
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
index 1eeaa1229e..1759f69b3e 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
@@ -20,274 +20,268 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import fj.data.Either;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
import org.openecomp.sdc.be.impl.ComponentsUtils;
+import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
-import org.openecomp.sdc.common.config.EcompErrorName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import fj.data.Either;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
public class DistributionEngineInitTask implements Runnable {
- public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
- public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
- public static final String CONSUMER = "CONSUMER";
- public static final String PRODUCER = "PRODUCER";
- public static final String CREATED = "CREATED";
- public static final String FAILED = "FAILED";
- public static final Integer HTTP_OK = 200;
-
- private Long delayBeforeStartFlow = 0l;
- private DistributionEngineConfiguration deConfiguration;
- private String envName;
- private long retryInterval;
- private long currentRetryInterval;
- private long maxInterval;
- // private boolean active = false;
- boolean maximumRetryInterval = false;
- private AtomicBoolean status = null;
- ComponentsUtils componentsUtils = null;
- DistributionEnginePollingTask distributionEnginePollingTask = null;
-
- private CambriaHandler cambriaHandler = new CambriaHandler();
-
- public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask) {
- super();
- this.delayBeforeStartFlow = delayBeforeStartFlow;
- this.deConfiguration = deConfiguration;
- this.envName = envName;
- this.retryInterval = deConfiguration.getInitRetryIntervalSec();
- this.currentRetryInterval = retryInterval;
- this.maxInterval = deConfiguration.getInitMaxIntervalSec();
- this.status = status;
- this.componentsUtils = componentsUtils;
- this.distributionEnginePollingTask = distributionEnginePollingTask;
- }
-
- private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
-
- private static Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class.getName());
-
- ScheduledFuture<?> scheduledFuture = null;
-
- public void startTask() {
- if (scheduledExecutorService != null) {
- Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
- logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
- this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
-
- }
- }
-
- public void restartTask() {
-
- this.stopTask();
-
- logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
-
- long lastCurrentInterval = currentRetryInterval;
- incrementRetryInterval();
-
- this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
-
- }
-
- protected void incrementRetryInterval() {
- if (currentRetryInterval < maxInterval) {
- currentRetryInterval *= 2;
- if (currentRetryInterval > maxInterval) {
- setMaxRetryInterval();
- }
- } else {
- setMaxRetryInterval();
- }
- }
-
- private void setMaxRetryInterval() {
- currentRetryInterval = maxInterval;
- maximumRetryInterval = true;
- logger.debug("Set next retry init interval to {}", maxInterval);
- }
-
- public void stopTask() {
- if (scheduledFuture != null) {
- boolean result = scheduledFuture.cancel(true);
- logger.debug("Stop reinit task. result = {}", result);
- if (false == result) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
- BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
- }
- scheduledFuture = null;
- }
- }
-
- public void destroy() {
- this.stopTask();
- if (scheduledExecutorService != null) {
- scheduledExecutorService.shutdown();
- }
- }
-
- @Override
- public void run() {
-
- boolean result = false;
- result = initFlow();
-
- if (true == result) {
- this.stopTask();
- this.status.set(true);
- if (this.distributionEnginePollingTask != null) {
- String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
- logger.debug("start polling distribution status topic {}", topicName);
- this.distributionEnginePollingTask.startTask(topicName);
- }
- } else {
- if (false == maximumRetryInterval) {
- this.restartTask();
- }
- }
- }
-
- /**
- * run initialization flow
- *
- * @return
- */
- public boolean initFlow() {
-
- logger.trace("Start init flow for environment {}", this.envName);
-
- Set<String> topicsList = null;
- Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
-
- getTopicsRes = cambriaHandler.getTopics(deConfiguration.getUebServers());
- if (getTopicsRes.isRight()) {
- CambriaErrorResponse status = getTopicsRes.right().value();
- if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
- topicsList = new HashSet<>();
- } else {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
-
- BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
-
- return false;
- }
- } else {
- topicsList = getTopicsRes.left().value();
- }
-
- String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
- logger.debug("Going to handle topic {}", notificationTopic);
-
- boolean status = createTopicIfNotExists(topicsList, notificationTopic);
- if (false == status) {
- return false;
- }
-
- CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
-
- CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
-
- if (createStatus != CambriaOperationStatus.OK) {
- return false;
- }
-
- String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
- logger.debug("Going to handle topic {}", statusTopic);
- status = createTopicIfNotExists(topicsList, statusTopic);
- if (false == status) {
- return false;
- }
-
- CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
-
- if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) {
- return false;
- }
-
- return true;
- }
-
- private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
- CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(deConfiguration.getUebServers(), topicName, deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), deConfiguration.getUebPublicKey(), subscriberType);
-
- String role = CONSUMER;
- if (subscriberType == SubscriberTypeEnum.PRODUCER) {
- role = PRODUCER;
- }
- auditRegistration(topicName, registerStatus, role);
- return registerStatus;
- }
-
- private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
- if (componentsUtils != null) {
- Integer httpCode = registerProducerStatus.getHttpCode();
- String httpCodeStr = String.valueOf(httpCode);
- this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, deConfiguration.getUebPublicKey(), httpCodeStr);
- }
- }
-
- private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) {
-
- if (topicsList.contains(topicName)) {
- if (componentsUtils != null) {
- this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
- }
- return true;
- }
-
- CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(deConfiguration.getUebServers(), deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
- deConfiguration.getCreateTopic().getReplicationCount());
-
- CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
- if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
- if (componentsUtils != null) {
- this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
- }
- } else if (status == CambriaOperationStatus.OK) {
- if (componentsUtils != null) {
- this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED);
- }
- } else {
- if (componentsUtils != null) {
- this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED);
- }
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
-
- BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
-
- return false;
- }
-
- return true;
- }
-
- public static String buildTopicName(String topicName, String environment) {
- return topicName + "-" + environment.toUpperCase();
- }
-
- public boolean isActive() {
- return this.status.get();
- }
-
- public long getCurrentRetryInterval() {
- return currentRetryInterval;
- }
-
- protected void setCambriaHandler(CambriaHandler cambriaHandler) {
- this.cambriaHandler = cambriaHandler;
- }
+ public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
+ public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
+ public static final String CONSUMER = "CONSUMER";
+ public static final String PRODUCER = "PRODUCER";
+ public static final String CREATED = "CREATED";
+ public static final String FAILED = "FAILED";
+ public static final Integer HTTP_OK = 200;
+
+ private Long delayBeforeStartFlow = 0l;
+ private DistributionEngineConfiguration deConfiguration;
+ private String envName;
+ private long retryInterval;
+ private long currentRetryInterval;
+ private long maxInterval;
+ boolean maximumRetryInterval = false;
+ private AtomicBoolean status = null;
+ ComponentsUtils componentsUtils = null;
+ DistributionEnginePollingTask distributionEnginePollingTask = null;
+ private OperationalEnvironmentEntry environmentEntry;
+
+ private CambriaHandler cambriaHandler = new CambriaHandler();
+
+ public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) {
+ super();
+ this.delayBeforeStartFlow = delayBeforeStartFlow;
+ this.deConfiguration = deConfiguration;
+ this.envName = envName;
+ this.retryInterval = deConfiguration.getInitRetryIntervalSec();
+ this.currentRetryInterval = retryInterval;
+ this.maxInterval = deConfiguration.getInitMaxIntervalSec();
+ this.status = status;
+ this.componentsUtils = componentsUtils;
+ this.distributionEnginePollingTask = distributionEnginePollingTask;
+ this.environmentEntry = environmentEntry;
+ }
+
+ private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class);
+
+ ScheduledFuture<?> scheduledFuture = null;
+
+ public void startTask() {
+ if (scheduledExecutorService != null) {
+ Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
+ logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
+ this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
+
+ }
+ }
+
+ public void restartTask() {
+
+ this.stopTask();
+
+ logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
+
+ long lastCurrentInterval = currentRetryInterval;
+ incrementRetryInterval();
+
+ this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
+
+ }
+
+ protected void incrementRetryInterval() {
+ if (currentRetryInterval < maxInterval) {
+ currentRetryInterval *= 2;
+ if (currentRetryInterval > maxInterval) {
+ setMaxRetryInterval();
+ }
+ } else {
+ setMaxRetryInterval();
+ }
+ }
+
+ private void setMaxRetryInterval() {
+ currentRetryInterval = maxInterval;
+ maximumRetryInterval = true;
+ logger.debug("Set next retry init interval to {}", maxInterval);
+ }
+
+ public void stopTask() {
+ if (scheduledFuture != null) {
+ boolean result = scheduledFuture.cancel(true);
+ logger.debug("Stop reinit task. result = {}", result);
+ if (false == result) {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
+ }
+ scheduledFuture = null;
+ }
+ }
+
+ public void destroy() {
+ this.stopTask();
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdown();
+ }
+ }
+
+ @Override
+ public void run() {
+
+ boolean result = false;
+ result = initFlow();
+
+ if (true == result) {
+ this.stopTask();
+ this.status.set(true);
+ if (this.distributionEnginePollingTask != null) {
+ String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
+ logger.debug("start polling distribution status topic {}", topicName);
+ this.distributionEnginePollingTask.startTask(topicName);
+ }
+ } else {
+ if (false == maximumRetryInterval) {
+ this.restartTask();
+ }
+ }
+ }
+
+ /**
+ * run initialization flow
+ *
+ * @return
+ */
+ public boolean initFlow() {
+
+ logger.trace("Start init flow for environment {}", this.envName);
+
+ Set<String> topicsList = null;
+ Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
+
+ getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
+ if (getTopicsRes.isRight()) {
+ CambriaErrorResponse status = getTopicsRes.right().value();
+ if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
+ topicsList = new HashSet<>();
+ } else {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
+ return false;
+ }
+ } else {
+ topicsList = getTopicsRes.left().value();
+ }
+
+ String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
+ logger.debug("Going to handle topic {}", notificationTopic);
+
+ boolean status = createTopicIfNotExists(topicsList, notificationTopic);
+ if (false == status) {
+ return false;
+ }
+
+ CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
+
+ CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
+
+ if (createStatus != CambriaOperationStatus.OK) {
+ return false;
+ }
+
+ String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
+ logger.debug("Going to handle topic {}", statusTopic);
+ status = createTopicIfNotExists(topicsList, statusTopic);
+ if (false == status) {
+ return false;
+ }
+
+ CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
+
+ if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) {
+ return false;
+ }
+
+ return true;
+ }
+
+ private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
+ CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), environmentEntry.getUebApikey(), subscriberType, topicName);
+
+ String role = CONSUMER;
+ if (subscriberType == SubscriberTypeEnum.PRODUCER) {
+ role = PRODUCER;
+ }
+ auditRegistration(topicName, registerStatus, role);
+ return registerStatus;
+ }
+
+ private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
+ if (componentsUtils != null) {
+ Integer httpCode = registerProducerStatus.getHttpCode();
+ String httpCodeStr = String.valueOf(httpCode);
+ this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, environmentEntry.getUebApikey(), httpCodeStr);
+ }
+ }
+
+ private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) {
+
+ if (topicsList.contains(topicName)) {
+ if (componentsUtils != null) {
+ this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
+ }
+ return true;
+ }
+
+ CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
+ deConfiguration.getCreateTopic().getReplicationCount());
+
+ CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
+ if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
+ if (componentsUtils != null) {
+ this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
+ }
+ } else if (status == CambriaOperationStatus.OK) {
+ if (componentsUtils != null) {
+ this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED);
+ }
+ } else {
+ if (componentsUtils != null) {
+ this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED);
+ }
+ BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
+ return false;
+ }
+
+ return true;
+ }
+
+ public static String buildTopicName(String topicName, String environment) {
+ return topicName + "-" + environment.toUpperCase();
+ }
+
+ public boolean isActive() {
+ return this.status.get();
+ }
+
+ public long getCurrentRetryInterval() {
+ return currentRetryInterval;
+ }
+
+ protected void setCambriaHandler(CambriaHandler cambriaHandler) {
+ this.cambriaHandler = cambriaHandler;
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
index fc7c473d6b..b4f4863284 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
@@ -20,188 +20,189 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
+import com.att.nsa.cambria.client.CambriaConsumer;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import fj.data.Either;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
import org.openecomp.sdc.be.impl.ComponentsUtils;
+import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
-import org.openecomp.sdc.common.config.EcompErrorName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
-import fj.data.Either;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
public class DistributionEnginePollingTask implements Runnable {
- public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
-
- private String topicName;
- private ComponentsUtils componentUtils;
- private int fetchTimeoutInSec = 15;
- private int pollingIntervalInSec;
- private String consumerId;
- private String consumerGroup;
- private DistributionEngineConfiguration distributionEngineConfiguration;
-
- private CambriaHandler cambriaHandler = new CambriaHandler();
- private Gson gson = new GsonBuilder().setPrettyPrinting().create();
-
- private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
-
- private static Logger logger = LoggerFactory.getLogger(DistributionEnginePollingTask.class.getName());
-
- ScheduledFuture<?> scheduledFuture = null;
- private CambriaConsumer cambriaConsumer = null;
-
- private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
-
- public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, String envName, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth) {
-
- this.componentUtils = componentUtils;
- this.distributionEngineConfiguration = distributionEngineConfiguration;
- DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
- this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
- this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
- this.consumerGroup = statusConfig.getConsumerGroup();
- this.consumerId = statusConfig.getConsumerId();
- this.distributionEngineClusterHealth = distributionEngineClusterHealth;
- }
-
- public void startTask(String topicName) {
-
- this.topicName = topicName;
- logger.debug("start task for polling topic {}", topicName);
- if (fetchTimeoutInSec < 15) {
- logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
- fetchTimeoutInSec = 15;
- }
- try {
- cambriaConsumer = cambriaHandler.createConsumer(distributionEngineConfiguration.getUebServers(), topicName, distributionEngineConfiguration.getUebPublicKey(), distributionEngineConfiguration.getUebSecretKey(), consumerId, consumerGroup,
- fetchTimeoutInSec * 1000);
-
- if (scheduledPollingService != null) {
- logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
- scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
-
- }
- } catch (Exception e) {
- logger.debug("unexpected error occured", e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
- BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
- }
- }
-
- public void stopTask() {
- if (scheduledFuture != null) {
- boolean result = scheduledFuture.cancel(true);
- logger.debug("Stop polling task. result = {}", result);
- if (false == result) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
- }
- scheduledFuture = null;
- }
- if (cambriaConsumer != null) {
- logger.debug("close consumer");
- cambriaHandler.closeConsumer(cambriaConsumer);
- }
-
- }
-
- public void destroy() {
- this.stopTask();
- shutdownExecutor();
- }
-
- @Override
- public void run() {
- logger.trace("run() method. polling queue {}", topicName);
-
- try {
- // init error
- if (cambriaConsumer == null) {
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
- stopTask();
- return;
- }
-
- Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
- // fetch error
- if (fetchResult.isRight()) {
- CambriaErrorResponse errorResponse = fetchResult.right().value();
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + fetchResult.right().value());
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + fetchResult.right().value());
-
- // TODO: if status== internal error (connection problem) change
- // state to inactive
- // in next try, if succeed - change to active
- return;
- }
-
- // success
- Iterable<String> messages = fetchResult.left().value();
- for (String message : messages) {
- logger.trace("received message {}", message);
- try {
- DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
- componentUtils.auditDistributionStatusNotification(AuditingActionEnum.DISTRIBUTION_STATUS, notification.getDistributionID(), notification.getConsumerID(), topicName, notification.getArtifactURL(),
- String.valueOf(notification.getTimestamp()), notification.getStatus().name(), notification.getErrorReason());
-
- distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
-
- } catch (Exception e) {
- logger.debug("failed to convert message to object", e);
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
- }
-
- }
- } catch (Exception e) {
- logger.debug("unexpected error occured", e);
- String methodName = new Object() {
- }.getClass().getEnclosingMethod().getName();
-
- BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
- BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
- }
-
- }
-
- private void shutdownExecutor() {
- if (scheduledPollingService == null)
- return;
-
- scheduledPollingService.shutdown(); // Disable new tasks from being
- // submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
- scheduledPollingService.shutdownNow(); // Cancel currently
- // executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS))
- logger.debug("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- scheduledPollingService.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
+ public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
+
+ private String topicName;
+ private ComponentsUtils componentUtils;
+ private int fetchTimeoutInSec = 15;
+ private int pollingIntervalInSec;
+ private String consumerId;
+ private String consumerGroup;
+
+ private CambriaHandler cambriaHandler = new CambriaHandler();
+ private Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ private DistributionCompleteReporter distributionCompleteReporter;
+
+ private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributionEnginePollingTask.class);
+
+ ScheduledFuture<?> scheduledFuture = null;
+ private CambriaConsumer cambriaConsumer = null;
+
+ private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
+
+ private OperationalEnvironmentEntry environmentEntry;
+
+ public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth, OperationalEnvironmentEntry environmentEntry) {
+
+ this.componentUtils = componentUtils;
+ DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
+ this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
+ this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
+ this.consumerGroup = statusConfig.getConsumerGroup();
+ this.consumerId = statusConfig.getConsumerId();
+ this.distributionEngineClusterHealth = distributionEngineClusterHealth;
+ this.environmentEntry = environmentEntry;
+ this.distributionCompleteReporter = distributionCompleteReporter;
+ }
+
+ public void startTask(String topicName) {
+
+ this.topicName = topicName;
+ logger.debug("start task for polling topic {}", topicName);
+ if (fetchTimeoutInSec < 15) {
+ logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
+ fetchTimeoutInSec = 15;
+ }
+ try {
+ cambriaConsumer = cambriaHandler.createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), consumerId, consumerGroup,
+ fetchTimeoutInSec * 1000);
+
+ if (scheduledPollingService != null) {
+ logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
+ scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
+
+ }
+ } catch (Exception e) {
+ logger.debug("unexpected error occured", e);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
+ }
+ }
+
+ public void stopTask() {
+ if (scheduledFuture != null) {
+ boolean result = scheduledFuture.cancel(true);
+ logger.debug("Stop polling task. result = {}", result);
+ if (false == result) {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
+ }
+ scheduledFuture = null;
+ }
+ if (cambriaConsumer != null) {
+ logger.debug("close consumer");
+ cambriaHandler.closeConsumer(cambriaConsumer);
+ }
+
+ }
+
+ public void destroy() {
+ this.stopTask();
+ shutdownExecutor();
+ }
+
+ @Override
+ public void run() {
+ logger.trace("run() method. polling queue {}", topicName);
+
+ try {
+ // init error
+ if (cambriaConsumer == null) {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
+ stopTask();
+ return;
+ }
+
+ Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
+ // fetch error
+ if (fetchResult.isRight()) {
+ CambriaErrorResponse errorResponse = fetchResult.right().value();
+ BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + errorResponse);
+
+ // TODO: if status== internal error (connection problem) change
+ // state to inactive
+ // in next try, if succeed - change to active
+ return;
+ }
+
+ // success
+ Iterable<String> messages = fetchResult.left().value();
+ for (String message : messages) {
+ logger.trace("received message {}", message);
+ try {
+ DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
+ handleDistributionNotificationMsg(notification);
+ distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
+ } catch (Exception e) {
+ logger.debug("failed to convert message to object", e);
+ BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
+ }
+
+ }
+ } catch (Exception e) {
+ logger.debug("unexpected error occured", e);
+ String methodName = new Object() {
+ }.getClass().getEnclosingMethod().getName();
+
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
+ }
+
+ }
+
+ private void handleDistributionNotificationMsg(DistributionStatusNotification notification) {
+ componentUtils.auditDistributionStatusNotification(AuditingActionEnum.DISTRIBUTION_STATUS, notification.getDistributionID(), notification.getConsumerID(), topicName, notification.getArtifactURL(),
+ String.valueOf(notification.getTimestamp()), notification.getStatus().name(), notification.getErrorReason());
+ if (notification.isDistributionCompleteNotification()) {
+ distributionCompleteReporter.reportDistributionComplete(notification);
+ }
+ }
+
+ private void shutdownExecutor() {
+ if (scheduledPollingService == null)
+ return;
+
+ scheduledPollingService.shutdown(); // Disable new tasks from being
+ // submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
+ scheduledPollingService.shutdownNow(); // Cancel currently
+ // executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS))
+ logger.debug("Pool did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ scheduledPollingService.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
index 16a0a1dc31..62af4b8514 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
@@ -20,20 +20,13 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-
-import javax.annotation.PreDestroy;
-
import org.openecomp.sdc.be.config.ConfigurationManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
-import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionNotificationTopicConfig;
+import org.openecomp.sdc.be.dao.api.ActionStatus;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
import org.openecomp.sdc.be.impl.ComponentsUtils;
import org.openecomp.sdc.be.model.Service;
-import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus;
-import org.openecomp.sdc.be.model.operations.impl.InterfaceLifecycleOperation;
import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
-import org.openecomp.sdc.common.util.ThreadLocalsHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -41,79 +34,83 @@ import org.springframework.stereotype.Component;
@Component("distributionNotificationSender")
public class DistributionNotificationSender {
- protected static final String DISTRIBUTION_NOTIFICATION_SENDING = "distributionNotificationSending";
-
- private static Logger logger = LoggerFactory.getLogger(DistributionNotificationSender.class.getName());
-
- // final String BASE_ARTIFACT_URL = "/sdc/v1/catalog/services/%s/%s/";
- // final String RESOURCE_ARTIFACT_URL = BASE_ARTIFACT_URL
- // + "resources/%s/%s/artifacts/%s";
- // final String SERVICE_ARTIFACT_URL = BASE_ARTIFACT_URL + "artifacts/%s";
-
- @javax.annotation.Resource
- InterfaceLifecycleOperation interfaceLifecycleOperation;
-
- @javax.annotation.Resource
- protected ComponentsUtils componentUtils;
-
- ExecutorService executorService = null;
-
- CambriaHandler cambriaHandler = new CambriaHandler();
-
- NotificationExecutorService notificationExecutorService = new NotificationExecutorService();
-
- public DistributionNotificationSender() {
- super();
-
- DistributionNotificationTopicConfig distributionNotificationTopic = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getDistributionNotificationTopic();
-
- executorService = notificationExecutorService.createExcecutorService(distributionNotificationTopic);
- }
-
- @PreDestroy
- public void shutdown() {
- logger.debug("Going to close notificationExecutorService");
- if (executorService != null) {
-
- long maxWaitingTime = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds();
-
- notificationExecutorService.shutdownAndAwaitTermination(executorService, maxWaitingTime + 1);
- }
- }
-
- public StorageOperationStatus sendNotification(String topicName, String distributionId, DistributionEngineConfiguration deConfiguration, String envName, INotificationData notificationData, Service service, String userId, String modifierName) {
-
- Runnable task = new PublishNotificationRunnable(envName, distributionId, service, notificationData, deConfiguration, topicName, userId, modifierName, cambriaHandler, componentUtils, ThreadLocalsHolder.getUuid());
- try {
- executorService.submit(task);
- } catch (RejectedExecutionException e) {
- logger.warn("Failed to submit task. Number of threads exceeeds", e);
- return StorageOperationStatus.OVERLOAD;
- }
-
- return StorageOperationStatus.OK;
- }
-
- /**
- * Audit the publishing notification in case of internal server error.
- *
- * @param topicName
- * @param status
- * @param distributionId
- * @param envName
- */
- private void auditDistributionNotificationInternalServerError(String topicName, StorageOperationStatus status, String distributionId, String envName) {
-
- if (this.componentUtils != null) {
- this.componentUtils.auditDistributionNotification(AuditingActionEnum.DISTRIBUTION_NOTIFY, "", " ", "Service", " ", " ", " ", envName, " ", topicName, distributionId, "Error: Internal Server Error. " + status, " ");
- }
- }
-
- protected CambriaErrorResponse publishNotification(INotificationData data, DistributionEngineConfiguration deConfiguration, String topicName) {
-
- CambriaErrorResponse status = cambriaHandler.sendNotification(topicName, deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), deConfiguration.getUebServers(), data);
+ protected static final String DISTRIBUTION_NOTIFICATION_SENDING = "distributionNotificationSending";
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributionNotificationSender.class);
+
+ @javax.annotation.Resource
+ protected ComponentsUtils componentUtils;
+ private CambriaHandler cambriaHandler = new CambriaHandler();
+ private DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+
+ public ActionStatus sendNotification(String topicName, String distributionId, EnvironmentMessageBusData messageBusData, INotificationData notificationData, Service service, String userId, String modifierName) {
+ long startTime = System.currentTimeMillis();
+ CambriaErrorResponse status = cambriaHandler.sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(), messageBusData.getDmaaPuebEndpoints(), notificationData,
+ deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds());
+ logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode());
+ auditDistributionNotification(topicName, distributionId, status, service, messageBusData.getEnvId(), userId, modifierName, notificationData.getWorkloadContext(), messageBusData.getTenant());
+ long endTime = System.currentTimeMillis();
+ logger.debug("After building and publishing artifacts object. Total took {} milliseconds", (endTime - startTime));
+ return convertCambriaResponse(status);
+ }
+
+ private void auditDistributionNotification(String topicName, String distributionId, CambriaErrorResponse status, Service service, String envId, String userId, String modifierName, String workloadContext, String tenant) {
+ if (this.componentUtils != null) {
+ Integer httpCode = status.getHttpCode();
+ String httpCodeStr = String.valueOf(httpCode);
+
+ String desc = getDescriptionFromErrorResponse(status);
+
+ this.componentUtils.auditDistributionNotification(AuditingActionEnum.DISTRIBUTION_NOTIFY, service.getUUID(), service.getName(), "Service", service.getVersion(), userId, modifierName, envId, service.getLifecycleState().name(), topicName,
+ distributionId, desc, httpCodeStr, workloadContext, tenant);
+ }
+ }
+
+ private String getDescriptionFromErrorResponse(CambriaErrorResponse status) {
+
+ CambriaOperationStatus operationStatus = status.getOperationStatus();
+
+ switch (operationStatus) {
+ case OK:
+ return "OK";
+ case AUTHENTICATION_ERROR:
+ return "Error: Authentication problem towards U-EB server";
+ case INTERNAL_SERVER_ERROR:
+ return "Error: Internal U-EB server error";
+ case UNKNOWN_HOST_ERROR:
+ return "Error: Cannot reach U-EB server host";
+ case CONNNECTION_ERROR:
+ return "Error: Cannot connect to U-EB server";
+ case OBJECT_NOT_FOUND:
+ return "Error: object not found in U-EB server";
+ default:
+ return "Error: Internal Cambria server problem";
+
+ }
+
+ }
+
+ private ActionStatus convertCambriaResponse(CambriaErrorResponse status) {
+ CambriaOperationStatus operationStatus = status.getOperationStatus();
+
+ switch (operationStatus) {
+ case OK:
+ return ActionStatus.OK;
+ case AUTHENTICATION_ERROR:
+ return ActionStatus.AUTHENTICATION_ERROR;
+ case INTERNAL_SERVER_ERROR:
+ return ActionStatus.GENERAL_ERROR;
+ case UNKNOWN_HOST_ERROR:
+ return ActionStatus.UNKNOWN_HOST;
+ case CONNNECTION_ERROR:
+ return ActionStatus.CONNNECTION_ERROR;
+ case OBJECT_NOT_FOUND:
+ return ActionStatus.OBJECT_NOT_FOUND;
+ default:
+ return ActionStatus.GENERAL_ERROR;
+
+ }
+ }
- return status;
- }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotification.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotification.java
index 73a0336361..006aa26082 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotification.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotification.java
@@ -22,63 +22,67 @@ package org.openecomp.sdc.be.components.distribution.engine;
public class DistributionStatusNotification {
- String distributionID;
- String consumerID;
- long timestamp;
- String artifactURL;
- DistributionStatusNotificationEnum status;
- String errorReason;
-
- public String getDistributionID() {
- return distributionID;
- }
-
- public void setDistributionID(String distributionId) {
- this.distributionID = distributionId;
- }
-
- public String getConsumerID() {
- return consumerID;
- }
-
- public void setConsumerID(String consumerId) {
- this.consumerID = consumerId;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- public String getArtifactURL() {
- return artifactURL;
- }
-
- public void setArtifactURL(String artifactURL) {
- this.artifactURL = artifactURL;
- }
-
- public DistributionStatusNotificationEnum getStatus() {
- return status;
- }
-
- public void setStatus(DistributionStatusNotificationEnum status) {
- this.status = status;
- }
-
- public String getErrorReason() {
- return errorReason;
- }
-
- public void setErrorReason(String errorReason) {
- this.errorReason = errorReason;
- }
-
- @Override
- public String toString() {
- return "DistributionStatusNotification [distributionId=" + distributionID + ", consumerId=" + consumerID + ", timestamp=" + timestamp + ", artifactURL=" + artifactURL + ", status=" + status + ", errorReason=" + errorReason + "]";
- }
+ String distributionID;
+ String consumerID;
+ long timestamp;
+ String artifactURL;
+ DistributionStatusNotificationEnum status;
+ String errorReason;
+
+ public String getDistributionID() {
+ return distributionID;
+ }
+
+ public void setDistributionID(String distributionId) {
+ this.distributionID = distributionId;
+ }
+
+ public String getConsumerID() {
+ return consumerID;
+ }
+
+ public void setConsumerID(String consumerId) {
+ this.consumerID = consumerId;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getArtifactURL() {
+ return artifactURL;
+ }
+
+ public void setArtifactURL(String artifactURL) {
+ this.artifactURL = artifactURL;
+ }
+
+ public DistributionStatusNotificationEnum getStatus() {
+ return status;
+ }
+
+ public void setStatus(DistributionStatusNotificationEnum status) {
+ this.status = status;
+ }
+
+ public String getErrorReason() {
+ return errorReason;
+ }
+
+ public void setErrorReason(String errorReason) {
+ this.errorReason = errorReason;
+ }
+
+ public boolean isDistributionCompleteNotification() {
+ return DistributionStatusNotificationEnum.DISTRIBUTION_COMPLETE_OK.equals(status) || DistributionStatusNotificationEnum.DISTRIBUTION_COMPLETE_ERROR.equals(status);
+ }
+
+ @Override
+ public String toString() {
+ return "DistributionStatusNotification [distributionId=" + distributionID + ", consumerId=" + consumerID + ", timestamp=" + timestamp + ", artifactURL=" + artifactURL + ", status=" + status + ", errorReason=" + errorReason + "]";
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotificationEnum.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotificationEnum.java
index bd77f3915a..65aa18ee97 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotificationEnum.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotificationEnum.java
@@ -22,5 +22,5 @@ package org.openecomp.sdc.be.components.distribution.engine;
public enum DistributionStatusNotificationEnum {
- DOWNLOAD_OK, DOWNLOAD_ERROR, ALREADY_DOWNLOADED, DEPLOY_OK, DEPLOY_ERROR, ALREADY_DEPLOYED, NOTIFIED, NOT_NOTIFIED
+ DOWNLOAD_OK, DOWNLOAD_ERROR, ALREADY_DOWNLOADED, DEPLOY_OK, DEPLOY_ERROR, ALREADY_DEPLOYED, NOTIFIED, NOT_NOTIFIED, COMPONENT_DONE_ERROR, COMPONENT_DONE_OK, DISTRIBUTION_COMPLETE_OK, DISTRIBUTION_COMPLETE_ERROR
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java
new file mode 100644
index 0000000000..3d35a84d7a
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java
@@ -0,0 +1,91 @@
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRConsumer;
+import fj.data.Either;
+import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
+import org.openecomp.sdc.security.SecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Properties;
+
+/**
+ * Allows to create DMAAP client of type MRConsumer according received configuration parameters
+ */
+@Component("dmaapClientFactory")
+public class DmaapClientFactory {
+ private static final Logger logger = LoggerFactory.getLogger(DmaapClientFactory.class);
+
+ /**
+ * Creates DMAAP consumer according to received parameters
+ * @param parameters
+ * @return an instance object of type MRConsumer
+ * @throws IOException
+ */
+ public MRConsumer create(DmaapConsumerConfiguration parameters) throws Exception {
+ MRConsumer consumer = MRClientFactory.createConsumer(buildProperties(parameters));
+ logger.info("MRConsumer created for topic {}", parameters.getTopic());
+ return consumer;
+ }
+
+ private Properties buildProperties(DmaapConsumerConfiguration parameters) throws Exception{
+ Properties props = new Properties();
+ Either<String,String> passkey = SecurityUtil.INSTANCE.decrypt(parameters.getCredential().getPassword() );
+ if (passkey.isRight()){
+ throw new GeneralSecurityException("invalid password, cannot build properties");
+ }
+ props.setProperty("Latitude", Double.toString(parameters.getLatitude()));
+ props.setProperty("Longitude", Double.toString(parameters.getLongitude()));
+ props.setProperty("Version", parameters.getVersion());
+ props.setProperty("ServiceName", parameters.getServiceName());
+ props.setProperty("Environment", parameters.getEnvironment());
+ props.setProperty("Partner", parameters.getPartner());
+ props.setProperty("routeOffer", parameters.getRouteOffer());
+ props.setProperty("Protocol", parameters.getProtocol());
+ props.setProperty("username", parameters.getCredential().getUsername());
+ props.setProperty("password", passkey.left().value() );
+ props.setProperty("contenttype", parameters.getContenttype());
+ props.setProperty("host", parameters.getHosts());
+ props.setProperty("topic", parameters.getTopic());
+ props.setProperty("group", parameters.getConsumerGroup());
+ props.setProperty("id", parameters.getConsumerId());
+ props.setProperty("timeout", Integer.toString(parameters.getTimeoutMs()));
+ props.setProperty("limit", Integer.toString(parameters.getLimit()));
+ props.setProperty("AFT_DME2_REQ_TRACE_ON", Boolean.toString(parameters.isDme2TraceOn()));
+ props.setProperty("AFT_ENVIRONMENT", parameters.getAftEnvironment());
+ props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", Integer.toString(parameters.getAftDme2ConnectionTimeoutMs()));
+ props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", Integer.toString(parameters.getAftDme2RoundtripTimeoutMs()));
+ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", Integer.toString(parameters.getAftDme2ReadTimeoutMs()));
+
+ String dme2PreferredRouterFilePath = parameters.getDme2preferredRouterFilePath();
+ ensureFileExists(dme2PreferredRouterFilePath);
+ props.setProperty("DME2preferredRouterFilePath", dme2PreferredRouterFilePath);
+
+ props.setProperty("TransportType", "DME2");
+ props.setProperty("SubContextPath", "/");
+ props.setProperty("MethodType", "GET");
+ props.setProperty("authKey", "");
+ props.setProperty("authDate", "");
+ props.setProperty("filter", "");
+ props.setProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", "");
+ props.setProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "");
+ props.setProperty("sessionstickinessrequired", "no");
+
+ return props;
+ }
+
+ private void ensureFileExists(String filePath) throws IOException {
+ File file = new File(filePath);
+ if(file.createNewFile()) {
+ logger.info("The file {} has been created on the disk", file.getAbsolutePath());
+ }
+ else{
+ logger.info("The file {} already exists", file.getAbsolutePath());
+ }
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java
new file mode 100644
index 0000000000..e0661f4b22
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java
@@ -0,0 +1,76 @@
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import com.att.nsa.mr.client.MRConsumer;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * Allows consuming DMAAP topic according to received consumer parameters
+ * Allows processing received messages.
+ */
+@Service
+public class DmaapConsumer {
+ private final ExecutorFactory executorFactory;
+ private final DmaapClientFactory dmaapClientFactory;
+ private static final Logger logger = LoggerFactory.getLogger(DmaapClientFactory.class);
+
+ @Autowired
+ private DmaapHealth dmaapHealth;
+ /**
+ * Allows to create an object of type DmaapConsumer
+ * @param executorFactory
+ * @param dmaapClientFactory
+ */
+ @Autowired
+ public DmaapConsumer(ExecutorFactory executorFactory, DmaapClientFactory dmaapClientFactory) {
+ this.executorFactory = executorFactory;
+ this.dmaapClientFactory = dmaapClientFactory;
+ }
+
+ /**
+ * Allows consuming DMAAP topic according to received consumer parameters
+ * @param notificationReceived
+ * @param exceptionHandler
+ * @throws Exception
+ */
+ public void consumeDmaapTopic(Consumer<String> notificationReceived, UncaughtExceptionHandler exceptionHandler) throws Exception {
+
+ DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
+ String topic = dmaapConsumerParams.getTopic();
+ logger.info("Starting to consume topic {} for DMAAP consumer with the next parameters {}. ", topic, dmaapConsumerParams);
+ MRConsumer consumer = dmaapClientFactory.create(dmaapConsumerParams);
+ ScheduledExecutorService pollExecutor = executorFactory.createScheduled(topic + "Client");
+ ExecutorService notificationExecutor = executorFactory.create(topic + "Consumer", exceptionHandler);
+
+ pollExecutor.scheduleWithFixedDelay(() -> {
+ logger.info("Trying to fetch messages from topic: {}", topic);
+ boolean isTopicAvailable = false;
+ try {
+ Iterable<String> messages = consumer.fetch();
+ isTopicAvailable = true ;
+ if (messages != null) {
+ for (String msg : messages) {
+ logger.info("The DMAAP message {} received. The topic is {}.", msg, topic);
+ notificationExecutor.execute(() -> notificationReceived.accept(msg));
+ }
+ }
+ //successfully fetched
+ }
+ catch (Exception e) {
+ logger.error("The exception {} occured upon fetching DMAAP message", e);
+ }
+ dmaapHealth.report( isTopicAvailable );
+ }, 0L, dmaapConsumerParams.getPollingInterval(), TimeUnit.SECONDS);
+ }
+
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapHealth.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapHealth.java
new file mode 100644
index 0000000000..ba5e9f7c9c
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapHealth.java
@@ -0,0 +1,220 @@
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import org.apache.commons.validator.routines.UrlValidator;
+import org.apache.http.client.utils.URIUtils;
+import org.openecomp.sdc.be.config.BeEcompErrorManager;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
+import org.openecomp.sdc.common.api.HealthCheckInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.commons.lang3.StringUtils.*;
+import static org.openecomp.sdc.common.api.Constants.HC_COMPONENT_DMAAP_ENGINE;
+
+@Component("dmaapHealth")
+public class DmaapHealth {
+
+
+ protected static final String DMAAP_HEALTH_LOG_CONTEXT = "dmaap.healthcheck";
+ private static final String DMAAP_HEALTH_CHECK_STR = "dmaapHealthCheck";
+ private static final Logger log = LoggerFactory.getLogger(DmaapHealth.class);
+ private static final Logger logHealth = LoggerFactory.getLogger(DMAAP_HEALTH_LOG_CONTEXT);
+ private HealthCheckInfo healthCheckInfo = DmaapHealth.HealthCheckInfoResult.UNAVAILABLE.getHealthCheckInfo();
+ private long healthCheckReadTimeout = 20;
+ private long reconnectInterval = 5;
+ private HealthCheckScheduledTask healthCheckScheduledTask = null ;
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ private ScheduledFuture<?> scheduledFuture = null;
+ private DmaapConsumerConfiguration configuration = null ;
+
+ private volatile AtomicBoolean lastHealthState = new AtomicBoolean(false);
+ private volatile AtomicBoolean reportedHealthState = null;
+
+ public enum HealthCheckInfoResult {
+ OK(new HealthCheckInfo(HC_COMPONENT_DMAAP_ENGINE, HealthCheckInfo.HealthCheckStatus.UP, null, DmaapStatusDescription.OK.getDescription())),
+ UNAVAILABLE(new HealthCheckInfo(HC_COMPONENT_DMAAP_ENGINE, HealthCheckInfo.HealthCheckStatus.DOWN, null, DmaapStatusDescription.UNAVAILABLE.getDescription())),
+ DOWN(new HealthCheckInfo(HC_COMPONENT_DMAAP_ENGINE, HealthCheckInfo.HealthCheckStatus.DOWN, null, DmaapStatusDescription.DOWN.getDescription()));
+
+ private HealthCheckInfo healthCheckInfo;
+ HealthCheckInfoResult(HealthCheckInfo healthCheckInfo) {
+ this.healthCheckInfo = healthCheckInfo;
+ }
+ public HealthCheckInfo getHealthCheckInfo() {
+ return healthCheckInfo;
+ }
+ }
+
+ public enum DmaapStatusDescription {
+ OK("OK"), UNAVAILABLE("Dmaap is not available"),DOWN("DOWN"), NOT_CONFIGURED("Dmaap configuration is missing/wrong ");
+
+ private String desc;
+ DmaapStatusDescription(String desc) {
+ this.desc = desc;
+ }
+ public String getDescription() {
+ return desc;
+ }
+
+ }
+
+ @PostConstruct
+ public DmaapHealth init() {
+ log.trace("Enter init method of Dmaap health");
+ synchronized (DmaapHealth.class){
+ this.configuration = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
+
+ Integer pollingInterval = configuration.getPollingInterval();
+ if (pollingInterval != null && pollingInterval!=0) {
+ reconnectInterval = pollingInterval;
+ }
+ Integer healthCheckReadTimeoutConfig = configuration.getTimeoutMs();
+ if (healthCheckReadTimeoutConfig != null) {
+ this.healthCheckReadTimeout = healthCheckReadTimeoutConfig;
+ }
+ this.healthCheckScheduledTask = new HealthCheckScheduledTask( configuration ); //what is the representation? csv? delimiter? json or other
+ startHealthCheckTask(true);
+ }
+ log.trace("Exit init method of DistributionEngineClusterHealth");
+ return this;
+ }
+
+ @PreDestroy
+ protected void destroy() {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ scheduledFuture = null;
+ }
+ if (scheduler != null) {
+ scheduler.shutdown();
+ }
+ }
+
+ /**
+ * Start health check task.
+ *
+ * @param startTask
+ */
+ public void startHealthCheckTask( boolean startTask ) {
+ synchronized (DmaapHealth.class){
+ if (startTask && this.scheduledFuture == null) {
+ this.scheduledFuture = this.scheduler.scheduleAtFixedRate(this.healthCheckScheduledTask , 0, reconnectInterval, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ public void report(Boolean isUp){
+ if (reportedHealthState == null)
+ reportedHealthState = new AtomicBoolean(isUp);
+ reportedHealthState.set(isUp);
+ }
+
+ public void logAlarm(boolean lastHealthState) {
+ try{
+ if ( lastHealthState ) {
+ BeEcompErrorManager.getInstance().logDmaapHealthCheckRecovery( DMAAP_HEALTH_CHECK_STR );
+ } else {
+ BeEcompErrorManager.getInstance().logDmaapHealthCheckError( DMAAP_HEALTH_CHECK_STR );
+ }
+ }catch( Exception e ){
+ log.debug("cannot logAlarm -> {}" ,e );
+ }
+ }
+
+ public DmaapConsumerConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public HealthCheckInfo getHealthCheckInfo() {
+ return healthCheckInfo;
+ }
+
+ /**
+ * Health Check Task Scheduler - infinite check.
+ */
+ public class HealthCheckScheduledTask implements Runnable {
+ private final DmaapConsumerConfiguration config;
+ private static final int timeout = 8192;
+
+ public HealthCheckScheduledTask(final DmaapConsumerConfiguration config){
+ this.config = config;
+ }
+ @Override
+ public void run() {
+ logHealth.trace("Executing Dmaap Health Check Task - Start");
+ boolean prevIsReachable = false;
+ boolean reachable = false;
+ //first try simple ping
+ try{
+ if ( reportedHealthState != null ){
+ reachable = reportedHealthState.get();
+ }
+ else{
+ reachable = false;
+ }
+ prevIsReachable = lastHealthState.getAndSet( reachable );
+ healthCheckInfo = reachable ? HealthCheckInfoResult.OK.healthCheckInfo : HealthCheckInfoResult.DOWN.healthCheckInfo;
+ }
+ catch( Exception e ){
+ log.debug("{} | cannot check connectivity -> {}",DMAAP_HEALTH_CHECK_STR, e );
+ prevIsReachable = lastHealthState.getAndSet(false);
+ healthCheckInfo = HealthCheckInfoResult.UNAVAILABLE.healthCheckInfo;
+ }
+ if (prevIsReachable != lastHealthState.get())
+ logAlarm( lastHealthState.get() );
+ }
+
+
+ /**
+ * @deprecated (health is reported outside from EnvironmentEngine consumer fetch)
+ */
+ @Deprecated
+ public boolean isICMPReachable( ) throws IOException{
+ try{
+ String hostname = getUrlHost(config.getHosts());
+ return InetAddress.getByName( hostname ).isReachable(timeout);
+ }catch( URISyntaxException e ){
+ log.debug("{} | malformed host configuration -> ",DMAAP_HEALTH_CHECK_STR , e);
+ }
+ return false;
+ }
+ }
+
+ public static String getUrlHost(String qualifiedHost) throws URISyntaxException{
+ //region - parse complex format ex. <http://URL:PORT>
+ try{
+ UrlValidator validator = new UrlValidator();
+ if (validator.isValid(qualifiedHost)){
+ return URIUtils.extractHost(new URI(qualifiedHost)).getHostName();
+ }else{
+ log.debug("{} | invalid url format, continuing ", DMAAP_HEALTH_CHECK_STR );
+ }
+ }catch(URISyntaxException e){
+ log.debug("{} | invalid url format, continuing {} ", DMAAP_HEALTH_CHECK_STR , e);
+ }
+ //endregion
+
+ //region - try shortcut format <URL> or <URL:PORT>
+ if ( countMatches( qualifiedHost , ":") <= 1){
+ String[] address = qualifiedHost.split(":");
+ if ( address.length>0 && isNotBlank(address[0]) ){
+ return address[0];
+ }
+ }
+ //endregion
+ throw new URISyntaxException( qualifiedHost , "invalid hostname, expecting a single <host:port> , (valid ex. www.google.com:80 | www.google.com | http:\\\\www.google.com:8181)");
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapNotificationDataImpl.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapNotificationDataImpl.java
new file mode 100644
index 0000000000..3f86fe73de
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapNotificationDataImpl.java
@@ -0,0 +1,65 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.sdc.be.components.distribution.engine;
+
+/* Example {
+ "operationalEnvironmentId": "28122015552391",
+ "operationalEnvironmentName": "Operational Environment Name",
+ "operationalEnvironmentType": "ECOMP",
+ "tenantContext": "TEST",
+ "workloadContext": "ECOMP_E2E-IST",
+ "action": "Create"
+ }*/
+public class DmaapNotificationDataImpl implements IDmaapNotificationData, IDmaapAuditNotificationData {
+
+ private String operationalEnvironmentId;
+ private String operationalEnvironmentType;
+ private String action;
+ private String operationalEnvironmentName;
+ private String tenantContext;
+
+ @Override
+ public String getOperationalEnvironmentId() {
+ return operationalEnvironmentId;
+ }
+
+ @Override
+ public OperationaEnvironmentTypeEnum getOperationalEnvironmentType() {
+ return OperationaEnvironmentTypeEnum.findByName(operationalEnvironmentType);
+ }
+
+ @Override
+ public DmaapActionEnum getAction() {
+ return DmaapActionEnum.findByName(action);
+ }
+
+ @Override
+ public String getOperationalEnvironmentName() {
+ return operationalEnvironmentName;
+ }
+
+ @Override
+ public String getTenantContext() {
+ return tenantContext;
+ }
+
+
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentMessageBusData.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentMessageBusData.java
new file mode 100644
index 0000000000..67977b6361
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentMessageBusData.java
@@ -0,0 +1,75 @@
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * a pojo which holds all the necessary data to communicate with the message bus
+ * this class is a reflection ot the {@link OperationalEnvironmentEntry} class
+ */
+public class EnvironmentMessageBusData {
+
+ private List<String> dmaaPuebEndpoints;
+
+ private String uebPublicKey;
+
+ private String uebPrivateKey;
+
+ private String envId;
+
+ private String tenant;
+
+ public EnvironmentMessageBusData() {
+ }
+
+ public EnvironmentMessageBusData(OperationalEnvironmentEntry operationalEnvironment) {
+ this.dmaaPuebEndpoints = new ArrayList<>(operationalEnvironment.getDmaapUebAddress());
+ this.uebPublicKey = operationalEnvironment.getUebApikey();
+ this.uebPrivateKey = operationalEnvironment.getUebSecretKey();
+ this.envId = operationalEnvironment.getEnvironmentId();
+ this.tenant = operationalEnvironment.getTenant();
+ }
+
+ public String getTenant() {
+ return tenant;
+ }
+
+ public void setTenant(String tenant) {
+ this.tenant = tenant;
+ }
+
+ public List<String> getDmaaPuebEndpoints() {
+ return dmaaPuebEndpoints;
+ }
+
+ public void setDmaaPuebEndpoints(List<String> dmaaPuebEndpoints) {
+ this.dmaaPuebEndpoints = dmaaPuebEndpoints;
+ }
+
+ public String getUebPublicKey() {
+ return uebPublicKey;
+ }
+
+ public void setUebPublicKey(String uebPublicKey) {
+ this.uebPublicKey = uebPublicKey;
+ }
+
+ public String getUebPrivateKey() {
+ return uebPrivateKey;
+ }
+
+ public void setUebPrivateKey(String uebPrivateKey) {
+ this.uebPrivateKey = uebPrivateKey;
+ }
+
+ public String getEnvId() {
+ return envId;
+ }
+
+ public void setEnvId(String envId) {
+ this.envId = envId;
+ }
+
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentsEngine.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentsEngine.java
new file mode 100644
index 0000000000..822464c631
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentsEngine.java
@@ -0,0 +1,526 @@
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import com.att.aft.dme2.api.DME2Exception;
+import com.att.aft.dme2.iterator.DME2EndpointIterator;
+import com.att.aft.dme2.iterator.domain.DME2EndpointReference;
+import com.att.aft.dme2.manager.registry.DME2Endpoint;
+import com.att.nsa.apiClient.credentials.ApiCredential;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import fj.data.Either;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpStatus;
+import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.DmaapActionEnum;
+import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.OperationaEnvironmentTypeEnum;
+import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
+import org.openecomp.sdc.be.config.DmeConfiguration;
+import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
+import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
+import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
+import org.openecomp.sdc.be.impl.ComponentsUtils;
+import org.openecomp.sdc.be.info.OperationalEnvInfo;
+import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
+import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
+import org.openecomp.sdc.common.datastructure.Wrapper;
+import org.openecomp.sdc.common.http.client.api.HttpResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut;
+
+/**
+ * Allows to consume DMAAP topic and handle received notifications
+ */
+@Service
+public class EnvironmentsEngine implements INotificationHandler {
+
+ private static final String MESSAGE_BUS = "MessageBus";
+ private static final String UNKNOWN = "Unknown";
+ private static final Logger log = LoggerFactory.getLogger(EnvironmentsEngine.class);
+ private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
+
+ private Map<String, OperationalEnvironmentEntry> environments;
+ private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
+ private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
+ private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
+
+ private final DmaapConsumer dmaapConsumer;
+ private final OperationalEnvironmentDao operationalEnvironmentDao;
+ private final DME2EndpointIteratorCreator epIterCreator;
+ private final AaiRequestHandler aaiRequestHandler;
+ private final ComponentsUtils componentUtils;
+ private final CambriaHandler cambriaHandler;
+ private final DistributionEngineClusterHealth distributionEngineClusterHealth;
+ private final DistributionCompleteReporter distributionCompleteReporter;
+
+ public EnvironmentsEngine(DmaapConsumer dmaapConsumer, OperationalEnvironmentDao operationalEnvironmentDao, DME2EndpointIteratorCreator epIterCreator, AaiRequestHandler aaiRequestHandler, ComponentsUtils componentUtils, CambriaHandler cambriaHandler, DistributionEngineClusterHealth distributionEngineClusterHealth, DistributionCompleteReporter distributionCompleteReporter) {
+ this.dmaapConsumer = dmaapConsumer;
+ this.operationalEnvironmentDao = operationalEnvironmentDao;
+ this.epIterCreator = epIterCreator;
+ this.aaiRequestHandler = aaiRequestHandler;
+ this.componentUtils = componentUtils;
+ this.cambriaHandler = cambriaHandler;
+ this.distributionEngineClusterHealth = distributionEngineClusterHealth;
+ this.distributionCompleteReporter = distributionCompleteReporter;
+ }
+
+ @VisibleForTesting
+ @PostConstruct
+ void init() {
+ log.trace("Environments engine has been initialized. ");
+ try {
+ environments = populateEnvironments();
+ createUebTopicsForEnvironments();
+ dmaapConsumer.consumeDmaapTopic(this::handleMessage,
+ (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e));
+ }
+ catch (Exception e) {
+ log.error("An error occurred upon consuming topic by Dmaap consumer client." , e);
+ }
+ }
+ public void connectUebTopicTenantIsolation(OperationalEnvironmentEntry opEnvEntry,
+ AtomicBoolean status,
+ Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask){
+ connectUebTopic(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
+
+ }
+
+ public void connectUebTopicForDistributionConfTopic(String envName,
+ AtomicBoolean status,
+ Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask){
+ connectUebTopic(environments.get(envName), status, envNamePerInitTask, envNamePerPollingTask);
+
+ }
+ /**
+ * Allows to create and run UEB initializing and polling tasks
+ * @param status
+ * @param envNamePerInitTask
+ * @param envNamePerPollingTask
+ * @param opEnvEntry
+ */
+ private void connectUebTopic(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
+ Map<String, DistributionEngineInitTask> envNamePerInitTask,
+ Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
+
+ String envId = opEnvEntry.getEnvironmentId();
+
+ DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
+ .getDistributionEngineConfiguration();
+ DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask(
+ distributionEngineConfiguration, distributionCompleteReporter, componentUtils, distributionEngineClusterHealth,
+ opEnvEntry);
+ String envName = configurationManager.getDistributionEngineConfiguration().getEnvironments().get(0);
+ DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l,
+ distributionEngineConfiguration, envName, status, componentUtils, distributionEnginePollingTask,
+ opEnvEntry);
+ distributionEngineInitTask.startTask();
+ envNamePerInitTask.put(envId, distributionEngineInitTask);
+ envNamePerPollingTask.put(envId, distributionEnginePollingTask);
+
+ log.debug("Environment envId = {} has been connected to the UEB topic", envId);
+ }
+
+ @Override
+ public boolean handleMessage(String notification) {
+ DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager()
+ .getConfiguration().getDmaapConsumerConfiguration();
+ Supplier<Boolean> supplier = () -> handleMessageLogic(notification);
+ Either<Boolean, Boolean> eitherTimeOut = runMethodWithTimeOut(supplier,
+ dmaapConsumerParams.getTimeLimitForNotificationHandleMs());
+
+ boolean result;
+ if (eitherTimeOut.isRight()) {
+ result = false;
+ } else {
+ result = eitherTimeOut.left().value();
+ }
+ return result;
+ }
+
+ public boolean handleMessageLogic(String notification) {
+ Wrapper<Boolean> errorWrapper = new Wrapper<>();
+ Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper = new Wrapper<>();
+ try {
+
+ log.debug("handle message - for operational environment notification received: {}", notification);
+ Gson gsonObj = new GsonBuilder().create();
+
+ IDmaapNotificationData notificationData = gsonObj.fromJson(notification,
+ DmaapNotificationDataImpl.class);
+ IDmaapAuditNotificationData auditNotificationData = gsonObj.fromJson(notification,
+ DmaapNotificationDataImpl.class);
+
+ AuditingActionEnum actionEnum;
+ switch(notificationData.getAction()) {
+ case CREATE:
+ actionEnum = AuditingActionEnum.CREATE_ENVIRONMENT;
+ break;
+ case UPDATE:
+ actionEnum = AuditingActionEnum.UPDATE_ENVIRONMENT;
+ break;
+ case DELETE:
+ actionEnum = AuditingActionEnum.DELETE_ENVIRONMENT;
+ break;
+ default:
+ actionEnum = AuditingActionEnum.UNKNOWN_ENVIRONMENT_NOTIFICATION;
+ break;
+ }
+ componentUtils.auditEnvironmentEngine(actionEnum,
+ notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
+ notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
+ auditNotificationData.getTenantContext());
+
+ if (errorWrapper.isEmpty()) {
+ validateNotification(errorWrapper, notificationData, auditNotificationData);
+ }
+ // Perform Save In-Progress Dao
+ if (errorWrapper.isEmpty()) {
+ saveEntryWithInProgressStatus(errorWrapper, opEnvEntryWrapper, notificationData);
+ }
+
+ if (errorWrapper.isEmpty()) {
+ buildOpEnv(errorWrapper, opEnvEntryWrapper.getInnerElement());
+ }
+
+ } catch (Exception e) {
+ log.debug("handle message for operational environmet failed for notification: {} with error :{}",
+ notification, e.getMessage(), e);
+ errorWrapper.setInnerElement(false);
+
+ }
+ return errorWrapper.isEmpty();
+ }
+
+ private void validateNotification(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
+ IDmaapAuditNotificationData auditNotificationData) {
+ // Check OperationaEnvironmentType
+ if (errorWrapper.isEmpty()) {
+ validateEnvironmentType(errorWrapper, notificationData, auditNotificationData);
+ }
+ // Check Action Type
+ if (errorWrapper.isEmpty()) {
+ validateActionType(errorWrapper, notificationData);
+ }
+ // Check is valid for create/update (not In-Progress state)
+ if (errorWrapper.isEmpty()) {
+ validateState(errorWrapper, notificationData);
+ }
+ }
+
+ public void buildOpEnv(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
+ // Get Env Info From A&AI
+ if (errorWrapper.isEmpty()) {
+ retrieveOpEnvInfoFromAAI(errorWrapper, opEnvEntry);
+ }
+
+ if (errorWrapper.isEmpty()) {
+ // Get List Of UEB Addresses From AFT_DME
+ retrieveUebAddressesFromAftDme(errorWrapper, opEnvEntry);
+ }
+
+ // Create UEB keys and set them on EnvEntry
+ if (errorWrapper.isEmpty()) {
+ createUebKeys(errorWrapper, opEnvEntry);
+ }
+
+ // Create Topics
+ if (errorWrapper.isEmpty()) {
+ log.debug("handle message - Create Topics");
+ createUebTopicsForEnvironment(opEnvEntry);
+ }
+
+ // Save Status Complete and Add to Map
+ if (errorWrapper.isEmpty()) {
+ saveEntryWithCompleteStatus(errorWrapper, opEnvEntry);
+ }
+
+ // Update Environments Map
+ if (errorWrapper.isEmpty()) {
+ environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
+ }
+ else{
+ saveEntryWithFailedStatus(errorWrapper, opEnvEntry);
+ }
+ }
+
+ private void saveEntryWithFailedStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
+ log.debug("handle message - save OperationalEnvironment Failed Status");
+ opEnvEntry.setStatus(EnvironmentStatusEnum.FAILED);
+ saveOpEnvEntry(errorWrapper, opEnvEntry);
+ }
+
+ void saveEntryWithCompleteStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
+ log.debug("handle message - save OperationalEnvironment Complete Dao");
+ opEnvEntry.setStatus(EnvironmentStatusEnum.COMPLETED);
+ saveOpEnvEntry(errorWrapper, opEnvEntry);
+
+ }
+
+ void retrieveUebAddressesFromAftDme(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
+ log.debug("handle message - Get List Of UEB Addresses From AFT_DME");
+ try {
+ boolean isKeyFieldsValid = !isEmpty(opEnvEntry.getTenant()) && !isEmpty(opEnvEntry.getEcompWorkloadContext());
+ if( isKeyFieldsValid ){
+ String opEnvKey = map2OpEnvKey(opEnvEntry);
+ String environmentId = opEnvEntry.getEnvironmentId();
+ List<String> uebHosts = discoverUebHosts(opEnvKey, environmentId);
+ opEnvEntry.setDmaapUebAddress(uebHosts.stream().collect(Collectors.toSet()));
+ }
+ else{
+ errorWrapper.setInnerElement(false);
+ log.debug("Can Not Build AFT DME Key from workLoad & Tenant Fields.");
+ }
+
+ } catch (DME2Exception e) {
+ errorWrapper.setInnerElement(false);
+ log.error("Failed to retrieve Ueb Addresses From DME. ", e);
+ }
+ }
+
+ void createUebKeys(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
+ log.debug("handle message - Create UEB keys");
+ List<String> discoverEndPoints = opEnvEntry.getDmaapUebAddress().stream()
+ .collect(Collectors.toList());
+ Either<ApiCredential, CambriaErrorResponse> eitherCreateUebKeys = cambriaHandler
+ .createUebKeys(discoverEndPoints);
+ if (eitherCreateUebKeys.isRight()) {
+ errorWrapper.setInnerElement(false);
+ log.debug("handle message - failed to create UEB Keys");
+ } else {
+ ApiCredential apiCredential = eitherCreateUebKeys.left().value();
+ opEnvEntry.setUebApikey(apiCredential.getApiKey());
+ opEnvEntry.setUebSecretKey(apiCredential.getApiSecret());
+ }
+ }
+
+ void retrieveOpEnvInfoFromAAI(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
+ log.debug("handle message - Get Env Info From A&AI");
+ Either<OperationalEnvInfo, Integer> eitherOperationalEnvInfo = getOperationalEnvById(
+ opEnvEntry.getEnvironmentId());
+ if (eitherOperationalEnvInfo.isRight()) {
+ errorWrapper.setInnerElement(false);
+ log.debug("handle message - failed to retrieve details from A&AI");
+ } else {
+ OperationalEnvInfo operationalEnvInfo = eitherOperationalEnvInfo.left().value();
+ opEnvEntry.setEcompWorkloadContext(operationalEnvInfo.getWorkloadContext());
+ opEnvEntry.setTenant(operationalEnvInfo.getTenantContext());
+ }
+ }
+
+ void saveEntryWithInProgressStatus(Wrapper<Boolean> errorWrapper, Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper, IDmaapNotificationData notificationData) {
+ log.debug("handle message - save OperationalEnvironment In-Progress Dao");
+ OperationalEnvironmentEntry opEnvEntry = new OperationalEnvironmentEntry();
+ // Entry Environment ID holds actually the environment NAME
+ opEnvEntry.setEnvironmentId(notificationData.getOperationalEnvironmentId());
+ opEnvEntry.setStatus(EnvironmentStatusEnum.IN_PROGRESS);
+ opEnvEntry.setIsProduction(false);
+ saveOpEnvEntry(errorWrapper, opEnvEntry);
+ opEnvEntryWrapper.setInnerElement(opEnvEntry);
+
+ }
+
+
+ void validateState(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
+ log.debug("handle message - verify OperationalEnvironment not In-Progress");
+ String opEnvId = notificationData.getOperationalEnvironmentId();
+
+ Either<OperationalEnvironmentEntry, CassandraOperationStatus> eitherOpEnv = operationalEnvironmentDao
+ .get(opEnvId);
+ if (eitherOpEnv.isLeft()) {
+ final OperationalEnvironmentEntry opEnvEntry = eitherOpEnv.left().value();
+ if (StringUtils.equals(opEnvEntry.getStatus(), EnvironmentStatusEnum.IN_PROGRESS.getName())) {
+ errorWrapper.setInnerElement(false);
+ log.debug("handle message - validate State Failed Record Found With Status : {} Flow Stopped!", opEnvEntry.getStatus());
+ }
+ } else {
+ CassandraOperationStatus operationStatus = eitherOpEnv.right().value();
+ if (operationStatus != CassandraOperationStatus.NOT_FOUND) {
+ errorWrapper.setInnerElement(false);
+ log.debug("failed to retrieve operationa environment with id:{} cassandra error was :{}", opEnvId,
+ operationStatus);
+ }
+ }
+
+ }
+
+ void validateActionType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
+ log.debug("handle message - verify Action Type");
+ DmaapActionEnum action = notificationData.getAction();
+ if (action == DmaapActionEnum.DELETE) {
+ errorWrapper.setInnerElement(false);
+ log.debug("handle message - validate Action Type Failed With Action Type: {} Flow Stopped!", action);
+ }
+ }
+
+ void validateEnvironmentType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
+ IDmaapAuditNotificationData auditNotificationData) {
+ log.debug("handle message - verify OperationaEnvironmentType");
+ OperationaEnvironmentTypeEnum envType = notificationData.getOperationalEnvironmentType();
+ if (envType != OperationaEnvironmentTypeEnum.ECOMP) {
+ errorWrapper.setInnerElement(false);
+ log.debug("handle message - validate Environment Type Failed With Environment Type: {} Flow Stopped!", envType);
+ componentUtils.auditEnvironmentEngine(AuditingActionEnum.UNSUPPORTED_ENVIRONMENT_TYPE,
+ notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
+ notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
+ auditNotificationData.getTenantContext());
+ }
+ }
+
+
+ private void saveOpEnvEntry(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry entry) {
+ entry.setLastModified(new Date(System.currentTimeMillis()));
+ CassandraOperationStatus saveStaus = operationalEnvironmentDao.save(entry);
+ if (saveStaus != CassandraOperationStatus.OK) {
+ errorWrapper.setInnerElement(false);
+ log.debug("handle message saving operational environmet failed for id :{} with error : {}",
+ entry.getEnvironmentId(), saveStaus);
+ }
+ }
+
+ public List<String> discoverUebHosts(String opEnvKey, String env) throws DME2Exception {
+ DmeConfiguration dmeConfiguration = configurationManager.getConfiguration().getDmeConfiguration();
+ List<String> uebHosts = new LinkedList<>();
+
+ String lookupURI = String.format("http://%s/service=%s/version=1.0.0/envContext=%s/partner=*", dmeConfiguration.getDme2Search(), opEnvKey,
+ env);
+ DME2EndpointIterator iterator = epIterCreator.create(lookupURI);
+
+ // Beginning iteration
+ while (iterator.hasNext()) {
+ DME2EndpointReference ref = iterator.next();
+ DME2Endpoint dmeEndpoint = ref.getEndpoint();
+ log.debug("DME returns EP with UEB host {}, UEB port: {}", dmeEndpoint.getHost(), dmeEndpoint.getPort());
+ uebHosts.add(dmeEndpoint.getHost());
+ }
+
+ return uebHosts;
+ }
+
+ private String map2OpEnvKey(OperationalEnvironmentEntry entry) {
+ return String.format("%s.%s.%s", entry.getTenant(), entry.getEcompWorkloadContext(), MESSAGE_BUS);
+ }
+
+ private Map<String, OperationalEnvironmentEntry> populateEnvironments() {
+ Map<String, OperationalEnvironmentEntry> envs = getEnvironmentsFromDb();
+ OperationalEnvironmentEntry confEntry = readEnvFromConfig();
+ envs.put(confEntry.getEnvironmentId(), confEntry);
+ return envs;
+ }
+
+ private OperationalEnvironmentEntry readEnvFromConfig() {
+ OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
+ DistributionEngineConfiguration distributionEngineConfiguration = configurationManager
+ .getDistributionEngineConfiguration();
+ entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
+ entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
+
+ Set<String> puebEndpoints = new HashSet<>();
+ puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
+ entry.setDmaapUebAddress(puebEndpoints);
+
+ String envName = distributionEngineConfiguration.getEnvironments().size() == 1
+ ? distributionEngineConfiguration.getEnvironments().get(0) : UNKNOWN;
+ entry.setEnvironmentId(envName);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Enviroment read from configuration: {}", entry.toString());
+ }
+
+ return entry;
+ }
+
+ private Map<String, OperationalEnvironmentEntry> getEnvironmentsFromDb() {
+ Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> opEnvResult = operationalEnvironmentDao
+ .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
+
+ if (opEnvResult.isLeft()) {
+ Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream()
+ .collect(Collectors.toMap(OperationalEnvironmentEntry::getEnvironmentId, Function.identity()));
+ resultMap.forEach( (key, value) -> log.debug("Enviroment loaded from DB: {}", value.toString()) );
+ return resultMap;
+ } else {
+ CassandraOperationStatus status = opEnvResult.right().value();
+ log.debug("Failed to populate Operation Envirenments Map from Cassandra, DB status: {}", status);
+ return new HashMap<>();
+ }
+ }
+
+ void createUebTopicsForEnvironments() {
+ environments.values().forEach(this::createUebTopicsForEnvironment);
+ }
+
+ public void createUebTopicsForEnvironment(OperationalEnvironmentEntry opEnvEntry) {
+ String envId = opEnvEntry.getEnvironmentId();
+ log.debug("Create Environment {} on UEB Topic.", envId);
+ AtomicBoolean status = new AtomicBoolean(false);
+ envNamePerStatus.put(envId, status);
+
+ connectUebTopicTenantIsolation(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
+ }
+
+ @VisibleForTesting
+ void setConfigurationManager(ConfigurationManager configurationManager) {
+ this.configurationManager = configurationManager;
+ }
+
+ public Map<String, OperationalEnvironmentEntry> getEnvironments() {
+ return environments;
+ }
+
+
+ public Either<OperationalEnvInfo, Integer> getOperationalEnvById(String id) {
+ HttpResponse<String> resp = aaiRequestHandler.getOperationalEnvById(id);
+ if (resp.getStatusCode() == HttpStatus.SC_OK) {
+ try {
+ OperationalEnvInfo operationalEnvInfo = OperationalEnvInfo.createFromJson(resp.getResponse());
+
+ log.debug("Get \"{}\" operational environment. {}", id, operationalEnvInfo);
+ return Either.left(operationalEnvInfo);
+ } catch (Exception e) {
+ log.debug("Json convert to OperationalEnvInfo failed with exception ", e);
+ return Either.right(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ }
+ } else {
+ log.debug("Get \"{}\" operational environment failed with statusCode: {}, description: {}", id,
+ resp.getStatusCode(), resp.getDescription());
+ return Either.right(resp.getStatusCode());
+ }
+ }
+
+ public OperationalEnvironmentEntry getEnvironmentById (String envId) {
+ return environments.get(envId);
+ }
+
+ public boolean isInMap(OperationalEnvironmentEntry env) {
+ return isInMap(env.getEnvironmentId());
+ }
+
+ public boolean isInMap(String envId) {
+ return environments.containsKey(envId);
+ }
+
+ public void addToMap(OperationalEnvironmentEntry opEnvEntry) {
+ environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
+
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ExecutorFactory.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ExecutorFactory.java
new file mode 100644
index 0000000000..29579c64b7
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ExecutorFactory.java
@@ -0,0 +1,43 @@
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+@Component("executorFactory")
+/**
+ * Allows to create next kinds of single thread executors: SingleThreadExecutor and SingleThreadScheduledExecutor
+ */
+public class ExecutorFactory {
+
+ private static final Logger logger = LoggerFactory.getLogger(EnvironmentsEngine.class);
+
+ public ExecutorService create(String name, UncaughtExceptionHandler exceptionHandler){
+ logger.info("Going to create single thread executor. ");
+ ThreadFactory threadFactory = createThreadFactory(name, exceptionHandler);
+ return Executors.newSingleThreadExecutor(threadFactory);
+ }
+
+ public ScheduledExecutorService createScheduled(String name){
+ logger.info("Going to create single thread scheduled executor. ");
+ ThreadFactory threadFactory = createThreadFactory(name,
+ (t, e) -> LoggerFactory.getLogger(UncaughtExceptionHandler.class).error("An error occurred: ", e));
+ return Executors.newSingleThreadScheduledExecutor(threadFactory);
+ }
+
+ private ThreadFactory createThreadFactory(String name, UncaughtExceptionHandler exceptionHandler) {
+ String nameFormat = name + "-%d";
+ return new ThreadFactoryBuilder()
+ .setThreadFactory(Executors.defaultThreadFactory())
+ .setNameFormat(nameFormat)
+ .setUncaughtExceptionHandler(exceptionHandler)
+ .setDaemon(true)
+ .build();
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IArtifactInfo.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IArtifactInfo.java
index 169f4f3efa..4a917710a9 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IArtifactInfo.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IArtifactInfo.java
@@ -24,40 +24,40 @@ import org.openecomp.sdc.common.api.ArtifactTypeEnum;
public interface IArtifactInfo {
- /** Artifact File name */
- String getArtifactName();
-
- /**
- * Artifact Type.<br>
- * Following are valid values : HEAT , DG_XML. <br>
- * List of values will be extended in post-1510 releases.
- */
- ArtifactTypeEnum getArtifactType();
-
- /**
- * Relative artifact's URL. Should be used in REST GET API to download the artifact's payload.<br>
- * The full artifact URL will be in the following format :<br>
- * https://{serverBaseURL}/{resourcePath}<br>
- * serverBaseURL - Hostname ( ASDC LB FQDN) + optional port <br>
- * resourcePath - "artifactURL" <br>
- * Ex : https://asdc.sdc.com/v1/catalog/services/srv1/2.0/resources/aaa/1.0/artifacts/aaa.yml
- */
- String getArtifactURL();
-
- /**
- * Base-64 encoded MD5 checksum of the artifact's payload.<br>
- * Should be used for data integrity validation when an artifact's payload is downloaded.<br>
- */
- String getArtifactChecksum();
-
- /**
- * Installation timeout. Used by the orchestrator.
- */
- Integer getArtifactTimeout();
-
- /**
- * Artifact description
- */
- String getArtifactDescription();
+ /** Artifact File name */
+ String getArtifactName();
+
+ /**
+ * Artifact Type.<br>
+ * Following are valid values : HEAT , DG_XML. <br>
+ * List of values will be extended in post-1510 releases.
+ */
+ ArtifactTypeEnum getArtifactType();
+
+ /**
+ * Relative artifact's URL. Should be used in REST GET API to download the artifact's payload.<br>
+ * The full artifact URL will be in the following format :<br>
+ * https://{serverBaseURL}/{resourcePath}<br>
+ * serverBaseURL - Hostname ( ASDC LB FQDN) + optional port <br>
+ * resourcePath - "artifactURL" <br>
+ * Ex : https://asdc.sdc.com/v1/catalog/services/srv1/2.0/resources/aaa/1.0/artifacts/aaa.yml
+ */
+ String getArtifactURL();
+
+ /**
+ * Base-64 encoded MD5 checksum of the artifact's payload.<br>
+ * Should be used for data integrity validation when an artifact's payload is downloaded.<br>
+ */
+ String getArtifactChecksum();
+
+ /**
+ * Installation timeout. Used by the orchestrator.
+ */
+ Integer getArtifactTimeout();
+
+ /**
+ * Artifact description
+ */
+ String getArtifactDescription();
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java
index a27156616b..96abfe087c 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,26 +20,35 @@
package org.openecomp.sdc.be.components.distribution.engine;
+import org.openecomp.sdc.be.dao.api.ActionStatus;
import org.openecomp.sdc.be.model.Service;
import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus;
-
-import fj.data.Either;
+import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
public interface IDistributionEngine {
- public boolean isActive();
+ boolean isActive();
+
+ ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envName, String userId, String modifierName);
+
+ ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envId, String envName, String userId, String modifierName);
+
+ StorageOperationStatus isEnvironmentAvailable(String envName);
+
+ StorageOperationStatus isEnvironmentAvailable();
- public StorageOperationStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envName, String userId, String modifierName);
+ /**
+ * Currently, it used for tests. For real implementation we need cancel the initialization task and the polling task.
+ *
+ * @param envName
+ */
+ void disableEnvironment(String envName);
- public StorageOperationStatus isEnvironmentAvailable(String envName);
+ StorageOperationStatus isReadyForDistribution(Service service, String envName);
- /**
- * Currently, it used for tests. For real implementation we need cancel the initialization task and the polling task.
- *
- * @param envName
- */
- public void disableEnvironment(String envName);
+ INotificationData buildServiceForDistribution(Service service, String distributionId, String workloadContext);
- public Either<INotificationData, StorageOperationStatus> isReadyForDistribution(Service service, String distributionId, String envName);
+ StorageOperationStatus verifyServiceHasDeploymentArtifacts(Service service);
+ OperationalEnvironmentEntry getEnvironmentById(String opEnvId);
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapAuditNotificationData.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapAuditNotificationData.java
new file mode 100644
index 0000000000..c1b7a313fb
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapAuditNotificationData.java
@@ -0,0 +1,6 @@
+package org.openecomp.sdc.be.components.distribution.engine;
+
+public interface IDmaapAuditNotificationData {
+ String getOperationalEnvironmentName();
+ String getTenantContext();
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapNotificationData.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapNotificationData.java
new file mode 100644
index 0000000000..7b974e8a96
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapNotificationData.java
@@ -0,0 +1,76 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.getEnumValueByFieldValue;
+
+public interface IDmaapNotificationData {
+ String getOperationalEnvironmentId();
+
+ OperationaEnvironmentTypeEnum getOperationalEnvironmentType();
+
+ DmaapActionEnum getAction();
+
+
+
+
+
+ enum DmaapActionEnum {
+ DELETE("Delete"),
+ CREATE("Create"),
+ UPDATE("Update"),
+ UNKONW("UNKONW")
+
+ ;
+ private String actionName;
+
+ private DmaapActionEnum(String actionName) {
+ this.actionName = actionName;
+ }
+
+ public String getActionName() {
+ return actionName;
+ }
+
+ public static DmaapActionEnum findByName(String actionName){
+ return getEnumValueByFieldValue(actionName, DmaapActionEnum.values(), DmaapActionEnum::getActionName, UNKONW, false);
+ }
+ };
+ enum OperationaEnvironmentTypeEnum {
+ ECOMP("ECOMP"),
+ UNKONW("UNKONW")
+ ;
+ private String eventTypenName;
+
+ private OperationaEnvironmentTypeEnum(String eventTypenName) {
+ this.eventTypenName = eventTypenName;
+ }
+
+ public String getEventTypenName() {
+ return eventTypenName;
+ }
+
+ public static OperationaEnvironmentTypeEnum findByName(String operationalEnvironmentTypeName){
+ return getEnumValueByFieldValue(operationalEnvironmentTypeName, OperationaEnvironmentTypeEnum.values(), OperationaEnvironmentTypeEnum::getEventTypenName, UNKONW, false);
+ }
+ };
+
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationData.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationData.java
index d631724701..d66f8f92f1 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationData.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationData.java
@@ -23,84 +23,84 @@ package org.openecomp.sdc.be.components.distribution.engine;
import java.util.List;
public interface INotificationData {
- /**
- * Global Distribution Identifier: UUID generated by ASDC per each distribution activation.<br>
- * Generated UUID is compliant with RFC 4122.<br>
- * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br>
- * Ex.: AA97B177-9383-4934-8543-0F91A7A02836
- */
- String getDistributionID();
-
- /** Logical Service Name. */
- String getServiceName();
-
- /**
- * Service Version.<br>
- * Two dot (".") separated digit blocks.<br>
- * Ex. : "2.0"
- */
- String getServiceVersion();
-
- /**
- * Global UUID generated by ASDC per each service version. Generated UUID is compliant with RFC 4122.<br>
- * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br>
- * Ex. : AA97B177-9383-4934-8543-0F91A7A02836
- */
- String getServiceUUID();
-
- /**
- * Service description
- */
- String getServiceDescription();
-
- /**
- * ServiceInvariant UUID
- */
- String getServiceInvariantUUID();
-
- /** List of the resource instances */
- List<JsonContainerResourceInstance> getResources();
-
- /** List of the artifacts */
- List<ArtifactInfoImpl> getServiceArtifacts();
-
- String getWorkloadContext();
-
- void setDistributionID(String distributionId);
-
- /** Logical Service Name. */
- void setServiceName(String serviceName);
-
- /**
- * Service Version.<br>
- * Two dot (".") separated digit blocks.<br>
- * Ex. : "2.0"
- */
- void setServiceVersion(String serviceVersion);
-
- /**
- * Global UUID generated by ASDC per each service version. Generated UUID is compliant with RFC 4122.<br>
- * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br>
- * Ex. : AA97B177-9383-4934-8543-0F91A7A02836
- */
- void setServiceUUID(String serviceUUID);
-
- /**
- * Service Description
- */
- void setServiceDescription(String serviceDescription);
-
- /**
- * ServiceInvariant UUID
- */
- void setServiceInvariantUUID(String serviceInvariantUuid);
-
- /** List of the Resource Instances */
- void setResources(List<JsonContainerResourceInstance> resource);
-
- /** List of the Resource Instances */
- void setServiceArtifacts(List<ArtifactInfoImpl> artifacts);
-
- void setWorkloadContext(String workloadContext);
+ /**
+ * Global Distribution Identifier: UUID generated by ASDC per each distribution activation.<br>
+ * Generated UUID is compliant with RFC 4122.<br>
+ * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br>
+ * Ex.: AA97B177-9383-4934-8543-0F91A7A02836
+ */
+ String getDistributionID();
+
+ /** Logical Service Name. */
+ String getServiceName();
+
+ /**
+ * Service Version.<br>
+ * Two dot (".") separated digit blocks.<br>
+ * Ex. : "2.0"
+ */
+ String getServiceVersion();
+
+ /**
+ * Global UUID generated by ASDC per each service version. Generated UUID is compliant with RFC 4122.<br>
+ * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br>
+ * Ex. : AA97B177-9383-4934-8543-0F91A7A02836
+ */
+ String getServiceUUID();
+
+ /**
+ * Service description
+ */
+ String getServiceDescription();
+
+ /**
+ * ServiceInvariant UUID
+ */
+ String getServiceInvariantUUID();
+
+ /** List of the resource instances */
+ List<JsonContainerResourceInstance> getResources();
+
+ /** List of the artifacts */
+ List<ArtifactInfoImpl> getServiceArtifacts();
+
+ String getWorkloadContext();
+
+ void setDistributionID(String distributionId);
+
+ /** Logical Service Name. */
+ void setServiceName(String serviceName);
+
+ /**
+ * Service Version.<br>
+ * Two dot (".") separated digit blocks.<br>
+ * Ex. : "2.0"
+ */
+ void setServiceVersion(String serviceVersion);
+
+ /**
+ * Global UUID generated by ASDC per each service version. Generated UUID is compliant with RFC 4122.<br>
+ * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br>
+ * Ex. : AA97B177-9383-4934-8543-0F91A7A02836
+ */
+ void setServiceUUID(String serviceUUID);
+
+ /**
+ * Service Description
+ */
+ void setServiceDescription(String serviceDescription);
+
+ /**
+ * ServiceInvariant UUID
+ */
+ void setServiceInvariantUUID(String serviceInvariantUuid);
+
+ /** List of the Resource Instances */
+ void setResources(List<JsonContainerResourceInstance> resource);
+
+ /** List of the Resource Instances */
+ void setServiceArtifacts(List<ArtifactInfoImpl> artifacts);
+
+ void setWorkloadContext(String workloadContext);
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationHandler.java
new file mode 100644
index 0000000000..a1936c61dd
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationHandler.java
@@ -0,0 +1,11 @@
+package org.openecomp.sdc.be.components.distribution.engine;
+
+public interface INotificationHandler {
+ /**
+ * Allows to handle received topic message
+ * @param notification
+ * @return true if finished successfully otherwise false
+ */
+ public boolean handleMessage(String notification);
+
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IResourceArtifactInfo.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IResourceArtifactInfo.java
index 9a77b9f94f..deac8751b3 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IResourceArtifactInfo.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IResourceArtifactInfo.java
@@ -22,18 +22,18 @@ package org.openecomp.sdc.be.components.distribution.engine;
public interface IResourceArtifactInfo extends IArtifactInfo {
- /** resource name */
- String getResourceName();
+ /** resource name */
+ String getResourceName();
- /** resource version */
- String getResourceVersion();
+ /** resource version */
+ String getResourceVersion();
- /**
- * Global UUID of the resource that specific artifact belongs to.<br>
- * It is generated by ASDC per each resource version.<br>
- * Generated UUID is compliant with RFC 4122. It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-"). <br>
- * Ex.: AA97B177-9383-4934-8543-0F91A7A02836
- */
- String getResourceUUID();
+ /**
+ * Global UUID of the resource that specific artifact belongs to.<br>
+ * It is generated by ASDC per each resource version.<br>
+ * Generated UUID is compliant with RFC 4122. It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-"). <br>
+ * Ex.: AA97B177-9383-4934-8543-0F91A7A02836
+ */
+ String getResourceUUID();
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/JsonContainerResourceInstance.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/JsonContainerResourceInstance.java
index 5efcfe7fa8..db0e1e938d 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/JsonContainerResourceInstance.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/JsonContainerResourceInstance.java
@@ -20,109 +20,129 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.List;
-
import org.openecomp.sdc.be.model.ComponentInstance;
+import java.util.List;
+
public class JsonContainerResourceInstance {
- private String resourceInstanceName, resourceName, resourceVersion, resoucreType, resourceUUID, resourceInvariantUUID, resourceCustomizationUUID, category, subcategory;
- private List<ArtifactInfoImpl> artifacts;
-
- public JsonContainerResourceInstance(ComponentInstance resourceInstance, String resourceType, List<ArtifactInfoImpl> artifacts) {
- super();
- this.resourceInstanceName = resourceInstance.getName();
- this.resourceName = resourceInstance.getComponentName();
- this.resourceVersion = resourceInstance.getComponentVersion();
- this.resoucreType = resourceType;
- this.resourceUUID = resourceInstance.getComponentUid();
- this.artifacts = artifacts;
- this.resourceCustomizationUUID = resourceInstance.getCustomizationUUID();
- }
-
- public String getResourceInstanceName() {
- return resourceInstanceName;
- }
-
- public void setResourceInstanceName(String resourceInstanceName) {
- this.resourceInstanceName = resourceInstanceName;
- }
-
- public String getResourceName() {
- return resourceName;
- }
-
- public void setResourceName(String resourceName) {
- this.resourceName = resourceName;
- }
-
- public String getResourceVersion() {
- return resourceVersion;
- }
-
- public void setResourceVersion(String resourceVersion) {
- this.resourceVersion = resourceVersion;
- }
-
- public String getResoucreType() {
- return resoucreType;
- }
-
- public void setResoucreType(String resoucreType) {
- this.resoucreType = resoucreType;
- }
-
- public String getResourceUUID() {
- return resourceUUID;
- }
-
- public void setResourceUUID(String resourceUUID) {
- this.resourceUUID = resourceUUID;
- }
-
- public List<ArtifactInfoImpl> getArtifacts() {
- return artifacts;
- }
-
- public void setArtifacts(List<ArtifactInfoImpl> artifacts) {
- this.artifacts = artifacts;
- }
-
- public String getResourceInvariantUUID() {
- return resourceInvariantUUID;
- }
-
- public void setResourceInvariantUUID(String resourceInvariantUUID) {
- this.resourceInvariantUUID = resourceInvariantUUID;
- }
-
- public String getResourceCustomizationUUID() {
- return resourceCustomizationUUID;
- }
-
- public void setResourceCustomizationUUID(String customizationUUID) {
- this.resourceCustomizationUUID = customizationUUID;
- }
-
- public String getCategory() {
- return category;
- }
-
- public void setCategory(String category) {
- this.category = category;
- }
-
- public String getSubcategory() {
- return subcategory;
- }
-
- public void setSubcategory(String subcategory) {
- this.subcategory = subcategory;
- }
-
- @Override
- public String toString() {
- return "JsonContainerResourceInstance [resourceInstanceName=" + resourceInstanceName + ", resourceName=" + resourceName + ", resourceVersion=" + resourceVersion + ", resoucreType=" + resoucreType + ", resourceUUID=" + resourceUUID
- + ", resourceInvariantUUID=" + resourceInvariantUUID + ", resourceCustomizationUUID=" + resourceCustomizationUUID + ", category=" + category + ", subcategory=" + subcategory + ", artifacts=" + artifacts + "]";
- }
-
+ private String resourceInstanceName;
+ private String resourceName;
+ private String resourceVersion;
+ private String resourceType;
+ private String resourceUUID;
+ private String resourceInvariantUUID;
+ private String resourceCustomizationUUID;
+ private String category;
+ private String subcategory;
+ private List<ArtifactInfoImpl> artifacts;
+
+ public JsonContainerResourceInstance(ComponentInstance resourceInstance, String resourceType, List<ArtifactInfoImpl> artifacts) {
+ super();
+ this.resourceInstanceName = resourceInstance.getName();
+ this.resourceName = resourceInstance.getComponentName();
+ this.resourceVersion = resourceInstance.getComponentVersion();
+ this.resourceType = resourceType;
+ this.resourceUUID = resourceInstance.getComponentUid();
+ this.artifacts = artifacts;
+ this.resourceCustomizationUUID = resourceInstance.getCustomizationUUID();
+ }
+
+ public JsonContainerResourceInstance(ComponentInstance resourceInstance, List<ArtifactInfoImpl> artifacts) {
+ super();
+ this.resourceInstanceName = resourceInstance.getName();
+ this.resourceName = resourceInstance.getComponentName();
+ this.resourceVersion = resourceInstance.getComponentVersion();
+ if(resourceInstance.getOriginType() != null)
+ this.resourceType = resourceInstance.getOriginType().getValue();
+ this.resourceUUID = resourceInstance.getComponentUid();
+ this.artifacts = artifacts;
+ this.resourceCustomizationUUID = resourceInstance.getCustomizationUUID();
+ }
+
+ public String getResourceInstanceName() {
+ return resourceInstanceName;
+ }
+
+ public void setResourceInstanceName(String resourceInstanceName) {
+ this.resourceInstanceName = resourceInstanceName;
+ }
+
+ public String getResourceName() {
+ return resourceName;
+ }
+
+ public void setResourceName(String resourceName) {
+ this.resourceName = resourceName;
+ }
+
+ public String getResourceVersion() {
+ return resourceVersion;
+ }
+
+ public void setResourceVersion(String resourceVersion) {
+ this.resourceVersion = resourceVersion;
+ }
+
+ public String getResoucreType() {
+ return resourceType;
+ }
+
+ public void setResoucreType(String resoucreType) {
+ this.resourceType = resoucreType;
+ }
+
+ public String getResourceUUID() {
+ return resourceUUID;
+ }
+
+ public void setResourceUUID(String resourceUUID) {
+ this.resourceUUID = resourceUUID;
+ }
+
+ public List<ArtifactInfoImpl> getArtifacts() {
+ return artifacts;
+ }
+
+ public void setArtifacts(List<ArtifactInfoImpl> artifacts) {
+ this.artifacts = artifacts;
+ }
+
+ public String getResourceInvariantUUID() {
+ return resourceInvariantUUID;
+ }
+
+ public void setResourceInvariantUUID(String resourceInvariantUUID) {
+ this.resourceInvariantUUID = resourceInvariantUUID;
+ }
+
+ public String getResourceCustomizationUUID() {
+ return resourceCustomizationUUID;
+ }
+
+ public void setResourceCustomizationUUID(String customizationUUID) {
+ this.resourceCustomizationUUID = customizationUUID;
+ }
+
+ public String getCategory() {
+ return category;
+ }
+
+ public void setCategory(String category) {
+ this.category = category;
+ }
+
+ public String getSubcategory() {
+ return subcategory;
+ }
+
+ public void setSubcategory(String subcategory) {
+ this.subcategory = subcategory;
+ }
+
+ @Override
+ public String toString() {
+ return "JsonContainerResourceInstance [resourceInstanceName=" + resourceInstanceName + ", resourceName=" + resourceName + ", resourceVersion=" + resourceVersion + ", resoucreType=" + resourceType + ", resourceUUID=" + resourceUUID
+ + ", resourceInvariantUUID=" + resourceInvariantUUID + ", resourceCustomizationUUID=" + resourceCustomizationUUID + ", category=" + category + ", subcategory=" + subcategory + ", artifacts=" + artifacts + "]";
+ }
+
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationDataImpl.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationDataImpl.java
index 353039647d..1db67a9581 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationDataImpl.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationDataImpl.java
@@ -24,111 +24,111 @@ import java.util.List;
public class NotificationDataImpl implements INotificationData {
- private String distributionID;
- private String serviceName;
- private String serviceVersion;
- private String serviceUUID;
- private String serviceDescription;
- private String serviceInvariantUUID;
- private List<JsonContainerResourceInstance> resources;
- private List<ArtifactInfoImpl> serviceArtifacts;
- private String workloadContext;
-
- @Override
- public String getDistributionID() {
- return distributionID;
- }
-
- @Override
- public String getServiceName() {
- return serviceName;
- }
-
- @Override
- public String getServiceVersion() {
- return serviceVersion;
- }
-
- @Override
- public String getServiceUUID() {
- return serviceUUID;
- }
-
- public void setDistributionID(String distributionID) {
- this.distributionID = distributionID;
- }
-
- public void setServiceName(String serviceName) {
- this.serviceName = serviceName;
- }
-
- public void setServiceVersion(String serviceVersion) {
- this.serviceVersion = serviceVersion;
- }
-
- public void setServiceUUID(String serviceUUID) {
- this.serviceUUID = serviceUUID;
- }
-
- public String getServiceDescription() {
- return serviceDescription;
- }
-
- public void setServiceDescription(String serviceDescription) {
- this.serviceDescription = serviceDescription;
- }
- @Override
- public String getWorkloadContext() { return workloadContext; }
-
- @Override
- public void setWorkloadContext(String workloadContext) { this.workloadContext = workloadContext; }
-
- @Override
- public String toString() {
- return "NotificationDataImpl{" +
- "distributionID='" + distributionID + '\'' +
- ", serviceName='" + serviceName + '\'' +
- ", serviceVersion='" + serviceVersion + '\'' +
- ", serviceUUID='" + serviceUUID + '\'' +
- ", serviceDescription='" + serviceDescription + '\'' +
- ", serviceInvariantUUID='" + serviceInvariantUUID + '\'' +
- ", resources=" + resources +
- ", serviceArtifacts=" + serviceArtifacts +
- ", workloadContext='" + workloadContext + '\'' +
- '}';
- }
-
- @Override
- public List<JsonContainerResourceInstance> getResources() {
- return resources;
- }
-
- @Override
- public void setResources(List<JsonContainerResourceInstance> resources) {
- this.resources = resources;
-
- }
-
- @Override
- public List<ArtifactInfoImpl> getServiceArtifacts() {
- // TODO Auto-generated method stub
- return serviceArtifacts;
- }
-
- @Override
- public void setServiceArtifacts(List<ArtifactInfoImpl> serviceArtifacts) {
- this.serviceArtifacts = serviceArtifacts;
-
- }
-
- @Override
- public String getServiceInvariantUUID() {
- return serviceInvariantUUID;
- }
-
- @Override
- public void setServiceInvariantUUID(String serviceInvariantUUID) {
- this.serviceInvariantUUID = serviceInvariantUUID;
- }
+ private String distributionID;
+ private String serviceName;
+ private String serviceVersion;
+ private String serviceUUID;
+ private String serviceDescription;
+ private String serviceInvariantUUID;
+ private List<JsonContainerResourceInstance> resources;
+ private List<ArtifactInfoImpl> serviceArtifacts;
+ private String workloadContext;
+
+ @Override
+ public String getDistributionID() {
+ return distributionID;
+ }
+
+ @Override
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ @Override
+ public String getServiceVersion() {
+ return serviceVersion;
+ }
+
+ @Override
+ public String getServiceUUID() {
+ return serviceUUID;
+ }
+
+ public void setDistributionID(String distributionID) {
+ this.distributionID = distributionID;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public void setServiceVersion(String serviceVersion) {
+ this.serviceVersion = serviceVersion;
+ }
+
+ public void setServiceUUID(String serviceUUID) {
+ this.serviceUUID = serviceUUID;
+ }
+
+ public String getServiceDescription() {
+ return serviceDescription;
+ }
+
+ public void setServiceDescription(String serviceDescription) {
+ this.serviceDescription = serviceDescription;
+ }
+ @Override
+ public String getWorkloadContext() { return workloadContext; }
+
+ @Override
+ public void setWorkloadContext(String workloadContext) { this.workloadContext = workloadContext; }
+
+ @Override
+ public String toString() {
+ return "NotificationDataImpl{" +
+ "distributionID='" + distributionID + '\'' +
+ ", serviceName='" + serviceName + '\'' +
+ ", serviceVersion='" + serviceVersion + '\'' +
+ ", serviceUUID='" + serviceUUID + '\'' +
+ ", serviceDescription='" + serviceDescription + '\'' +
+ ", serviceInvariantUUID='" + serviceInvariantUUID + '\'' +
+ ", resources=" + resources +
+ ", serviceArtifacts=" + serviceArtifacts +
+ ", workloadContext='" + workloadContext + '\'' +
+ '}';
+ }
+
+ @Override
+ public List<JsonContainerResourceInstance> getResources() {
+ return resources;
+ }
+
+ @Override
+ public void setResources(List<JsonContainerResourceInstance> resources) {
+ this.resources = resources;
+
+ }
+
+ @Override
+ public List<ArtifactInfoImpl> getServiceArtifacts() {
+ // TODO Auto-generated method stub
+ return serviceArtifacts;
+ }
+
+ @Override
+ public void setServiceArtifacts(List<ArtifactInfoImpl> serviceArtifacts) {
+ this.serviceArtifacts = serviceArtifacts;
+
+ }
+
+ @Override
+ public String getServiceInvariantUUID() {
+ return serviceInvariantUUID;
+ }
+
+ @Override
+ public void setServiceInvariantUUID(String serviceInvariantUUID) {
+ this.serviceInvariantUUID = serviceInvariantUUID;
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationExecutorService.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationExecutorService.java
index 74fbb2c660..dc58a24e5f 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationExecutorService.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationExecutorService.java
@@ -20,62 +20,57 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionNotificationTopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.*;
public class NotificationExecutorService {
- private static Logger logger = LoggerFactory.getLogger(NotificationExecutorService.class.getName());
+ private static final Logger logger = LoggerFactory.getLogger(NotificationExecutorService.class);
- public ExecutorService createExcecutorService(DistributionNotificationTopicConfig distributionNotificationTopic) {
+ public ExecutorService createExcecutorService(DistributionNotificationTopicConfig distributionNotificationTopic) {
- Integer minThreadPoolSize = distributionNotificationTopic.getMinThreadPoolSize();
- if (minThreadPoolSize == null) {
- minThreadPoolSize = 0;
- }
+ Integer minThreadPoolSize = distributionNotificationTopic.getMinThreadPoolSize();
+ if (minThreadPoolSize == null) {
+ minThreadPoolSize = 0;
+ }
- Integer maxThreadPoolSize = distributionNotificationTopic.getMaxThreadPoolSize();
- if (maxThreadPoolSize == null) {
- maxThreadPoolSize = 10;
- }
+ Integer maxThreadPoolSize = distributionNotificationTopic.getMaxThreadPoolSize();
+ if (maxThreadPoolSize == null) {
+ maxThreadPoolSize = 10;
+ }
- ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
- threadFactoryBuilder.setNameFormat("distribution-notification-thread-%d");
- ThreadFactory threadFactory = threadFactoryBuilder.build();
+ ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
+ threadFactoryBuilder.setNameFormat("distribution-notification-thread-%d");
+ ThreadFactory threadFactory = threadFactoryBuilder.build();
- ExecutorService executorService = new ThreadPoolExecutor(minThreadPoolSize, maxThreadPoolSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
+ ExecutorService executorService = new ThreadPoolExecutor(minThreadPoolSize, maxThreadPoolSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
- return executorService;
- }
+ return executorService;
+ }
- public void shutdownAndAwaitTermination(ExecutorService pool, long maxTimeToWait) {
+ public void shutdownAndAwaitTermination(ExecutorService pool, long maxTimeToWait) {
- logger.debug("shutdown NotificationExecutorService");
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(maxTimeToWait, TimeUnit.SECONDS)) {
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(maxTimeToWait, TimeUnit.SECONDS)) {
- logger.debug("Failed to close executor service");
- }
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
+ logger.debug("shutdown NotificationExecutorService");
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(maxTimeToWait, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(maxTimeToWait, TimeUnit.SECONDS)) {
+ logger.debug("Failed to close executor service");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/PublishNotificationRunnable.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/PublishNotificationRunnable.java
deleted file mode 100644
index c283ecc92b..0000000000
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/PublishNotificationRunnable.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * SDC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.sdc.be.components.distribution.engine;
-
-import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
-import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
-import org.openecomp.sdc.be.impl.ComponentsUtils;
-import org.openecomp.sdc.be.model.Service;
-import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
-import org.openecomp.sdc.common.util.ThreadLocalsHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PublishNotificationRunnable implements Runnable {
-
- private String envName;
- private String distributionId;
- private Service service;
- private INotificationData data;
- private DistributionEngineConfiguration deConfiguration;
- private String topicName;
- private CambriaHandler cambriaHandler;
- private ComponentsUtils componentUtils;
- private String userId;
- private String modifierName;
- private String requestId;
-
- private static Logger logger = LoggerFactory.getLogger(PublishNotificationRunnable.class.getName());
-
- public PublishNotificationRunnable(String envName, String distributionId, Service service, INotificationData data, DistributionEngineConfiguration deConfiguration, String topicName, String userId, String modifierName,
- CambriaHandler cambriaHandler, ComponentsUtils componentUtils, String requestId) {
- super();
- this.envName = envName;
- this.distributionId = distributionId;
- this.service = service;
- this.data = data;
- this.deConfiguration = deConfiguration;
- this.topicName = topicName;
- this.cambriaHandler = cambriaHandler;
- this.componentUtils = componentUtils;
- this.userId = userId;
- this.modifierName = modifierName;
- this.requestId = requestId;
- }
-
- public INotificationData getData() {
- return data;
- }
-
- public void setData(INotificationData data) {
- this.data = data;
- }
-
- public DistributionEngineConfiguration getDeConfiguration() {
- return deConfiguration;
- }
-
- public void setDeConfiguration(DistributionEngineConfiguration deConfiguration) {
- this.deConfiguration = deConfiguration;
- }
-
- public String getTopicName() {
- return topicName;
- }
-
- public void setTopicName(String topicName) {
- this.topicName = topicName;
- }
-
- public String getUserId() {
- return userId;
- }
-
- public void setUserId(String userId) {
- this.userId = userId;
- }
-
- public String getModifierName() {
- return modifierName;
- }
-
- public void setModifierName(String modifierName) {
- this.modifierName = modifierName;
- }
-
- @Override
- public void run() {
-
- long startTime = System.currentTimeMillis();
- ThreadLocalsHolder.setUuid(this.requestId);
-
- CambriaErrorResponse status = cambriaHandler.sendNotificationAndClose(topicName, deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), deConfiguration.getUebServers(), data,
- deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds());
-
- logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode());
- auditDistributionNotification(topicName, status, service, distributionId, envName, userId, modifierName);
-
- long endTime = System.currentTimeMillis();
- logger.debug("After building and publishing artifacts object. Total took {} milliseconds", (endTime - startTime));
-
- }
-
- private void auditDistributionNotification(String topicName, CambriaErrorResponse status, Service service, String distributionId, String envName, String userId, String modifierName) {
- if (this.componentUtils != null) {
- Integer httpCode = status.getHttpCode();
- String httpCodeStr = String.valueOf(httpCode);
-
- String desc = getDescriptionFromErrorResponse(status);
-
- this.componentUtils.auditDistributionNotification(AuditingActionEnum.DISTRIBUTION_NOTIFY, service.getUUID(), service.getName(), "Service", service.getVersion(), userId, modifierName, envName, service.getLifecycleState().name(), topicName,
- distributionId, desc, httpCodeStr);
- }
- }
-
- private String getDescriptionFromErrorResponse(CambriaErrorResponse status) {
-
- CambriaOperationStatus operationStatus = status.getOperationStatus();
-
- switch (operationStatus) {
- case OK:
- return "OK";
- case AUTHENTICATION_ERROR:
- return "Error: Authentication problem towards U-EB server";
- case INTERNAL_SERVER_ERROR:
- return "Error: Internal U-EB server error";
- case UNKNOWN_HOST_ERROR:
- return "Error: Cannot reach U-EB server host";
- case CONNNECTION_ERROR:
- return "Error: Cannot connect to U-EB server";
- case OBJECT_NOT_FOUND:
- return "Error: object not found in U-EB server";
- default:
- return "Error: Internal Cambria server problem";
-
- }
-
- }
-}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ResourceArtifactInfoImpl.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ResourceArtifactInfoImpl.java
index 31f3cb6fda..19a857a115 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ResourceArtifactInfoImpl.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ResourceArtifactInfoImpl.java
@@ -22,37 +22,37 @@ package org.openecomp.sdc.be.components.distribution.engine;
public class ResourceArtifactInfoImpl extends ArtifactInfoImpl implements IResourceArtifactInfo {
- private String resourceName;
- private String resourceVersion;
- private String resourceUUID;
-
- public String getResourceName() {
- return resourceName;
- }
-
- public void setResourceName(String resourceName) {
- this.resourceName = resourceName;
- }
-
- public String getResourceVersion() {
- return resourceVersion;
- }
-
- public void setResourceVersion(String resourceVersion) {
- this.resourceVersion = resourceVersion;
- }
-
- public String getResourceUUID() {
- return resourceUUID;
- }
-
- public void setResourceUUID(String resourceUUID) {
- this.resourceUUID = resourceUUID;
- }
-
- @Override
- public String toString() {
- return "ResourceArtifactInfoImpl [resourceName=" + resourceName + ", resourceVersion=" + resourceVersion + ", resourceUUID=" + resourceUUID + super.toString() + "]";
- }
+ private String resourceName;
+ private String resourceVersion;
+ private String resourceUUID;
+
+ public String getResourceName() {
+ return resourceName;
+ }
+
+ public void setResourceName(String resourceName) {
+ this.resourceName = resourceName;
+ }
+
+ public String getResourceVersion() {
+ return resourceVersion;
+ }
+
+ public void setResourceVersion(String resourceVersion) {
+ this.resourceVersion = resourceVersion;
+ }
+
+ public String getResourceUUID() {
+ return resourceUUID;
+ }
+
+ public void setResourceUUID(String resourceUUID) {
+ this.resourceUUID = resourceUUID;
+ }
+
+ @Override
+ public String toString() {
+ return "ResourceArtifactInfoImpl [resourceName=" + resourceName + ", resourceVersion=" + resourceVersion + ", resourceUUID=" + resourceUUID + super.toString() + "]";
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceArtifactInfoImpl.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceArtifactInfoImpl.java
index 50d1700f37..1d626805f6 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceArtifactInfoImpl.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceArtifactInfoImpl.java
@@ -22,9 +22,9 @@ package org.openecomp.sdc.be.components.distribution.engine;
public class ServiceArtifactInfoImpl extends ArtifactInfoImpl implements IServiceArtifactInfo {
- @Override
- public String toString() {
- return "ServiceArtifactInfoImpl [" + super.toString() + "]";
- }
+ @Override
+ public String toString() {
+ return "ServiceArtifactInfoImpl [" + super.toString() + "]";
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceDistributionArtifactsBuilder.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceDistributionArtifactsBuilder.java
index 0330a756c6..f3d17979ba 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceDistributionArtifactsBuilder.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceDistributionArtifactsBuilder.java
@@ -20,26 +20,16 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import javax.annotation.PostConstruct;
+import fj.data.Either;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.openecomp.sdc.be.config.ConfigurationManager;
-import org.openecomp.sdc.be.model.ArtifactDefinition;
-import org.openecomp.sdc.be.model.ComponentInstance;
-import org.openecomp.sdc.be.model.ComponentParametersView;
-import org.openecomp.sdc.be.model.Resource;
-import org.openecomp.sdc.be.model.Service;
+import org.openecomp.sdc.be.model.*;
import org.openecomp.sdc.be.model.category.CategoryDefinition;
import org.openecomp.sdc.be.model.category.SubCategoryDefinition;
import org.openecomp.sdc.be.model.jsontitan.operations.ToscaOperationFacade;
import org.openecomp.sdc.be.model.operations.api.IArtifactOperation;
-import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus;
import org.openecomp.sdc.be.model.operations.impl.InterfaceLifecycleOperation;
import org.openecomp.sdc.common.api.ArtifactTypeEnum;
import org.slf4j.Logger;
@@ -47,279 +37,224 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import fj.data.Either;
+import java.util.*;
+import java.util.stream.Collectors;
@Component("serviceDistributionArtifactsBuilder")
public class ServiceDistributionArtifactsBuilder {
- private int defaultArtifactInstallTimeout = 60;
-
- private static Logger logger = LoggerFactory.getLogger(ServiceDistributionArtifactsBuilder.class.getName());
-
- final static String BASE_ARTIFACT_URL = "/sdc/v1/catalog/services/%s/%s/";
- final static String RESOURCE_ARTIFACT_URL = BASE_ARTIFACT_URL + "resources/%s/%s/artifacts/%s";
- final static String SERVICE_ARTIFACT_URL = BASE_ARTIFACT_URL + "artifacts/%s";
-
- final static String RESOURCE_INSTANCE_ARTIFACT_URL = BASE_ARTIFACT_URL + "resourceInstances/%s/artifacts/%s";
-
- @javax.annotation.Resource
- InterfaceLifecycleOperation interfaceLifecycleOperation;
-
- @javax.annotation.Resource
- IArtifactOperation artifactOperation;
-
- @Autowired
- ToscaOperationFacade toscaOperationFacade;
-
- /*
- * @javax.annotation.Resource private
- * InformationDeployedArtifactsBusinessLogic
- * informationDeployedArtifactsBusinessLogic;
- */
-
- @PostConstruct
- private void init() {
- defaultArtifactInstallTimeout = ConfigurationManager.getConfigurationManager().getConfiguration()
- .getDefaultHeatArtifactTimeoutMinutes();
- }
-
- public InterfaceLifecycleOperation getInterfaceLifecycleOperation() {
- return interfaceLifecycleOperation;
- }
-
- public void setInterfaceLifecycleOperation(InterfaceLifecycleOperation interfaceLifecycleOperation) {
- this.interfaceLifecycleOperation = interfaceLifecycleOperation;
- }
-
- public INotificationData buildResourceInstanceForDistribution(Service service, String distributionId) {
- INotificationData notificationData = new NotificationDataImpl();
-
- notificationData.setResources(convertRIToJsonContanier(service));
- notificationData.setServiceName(service.getName());
- notificationData.setServiceVersion(service.getVersion());
- notificationData.setDistributionID(distributionId);
- notificationData.setServiceUUID(service.getUUID());
- notificationData.setServiceDescription(service.getDescription());
- notificationData.setServiceInvariantUUID(service.getInvariantUUID());
- String workloadContext= ConfigurationManager.getConfigurationManager().getConfiguration().getWorkloadContext();
- if(workloadContext!=null){
- notificationData.setWorkloadContext(workloadContext);
- }
- logger.debug("Before returning notification data object {}", notificationData);
-
- return notificationData;
-
- }
-
- public INotificationData buildServiceForDistribution(INotificationData notificationData, Service service) {
-
- notificationData.setServiceArtifacts(convertServiceArtifactsToArtifactInfo(service));
-
- logger.debug("Before returning notification data object {}", notificationData);
-
- return notificationData;
-
- }
-
- private List<ArtifactInfoImpl> convertServiceArtifactsToArtifactInfo(Service service) {
-
- Map<String, ArtifactDefinition> serviceArtifactsMap = service.getDeploymentArtifacts();
- List<ArtifactDefinition> extractedServiceArtifacts = serviceArtifactsMap.values().stream()
- //filters all artifacts with existing EsId
- .filter(artifactDef -> artifactDef.checkEsIdExist())
- //collects all filtered artifacts with existing EsId to List
- .collect(Collectors.toList());
-
- Optional<ArtifactDefinition> toscaTemplateArtifactOptl = exrtactToscaTemplateArtifact(service);
- if(toscaTemplateArtifactOptl.isPresent()){
- extractedServiceArtifacts.add(toscaTemplateArtifactOptl.get());
- }
-
- Optional<ArtifactDefinition> toscaCsarArtifactOptl = exrtactToscaCsarArtifact(service);
- if(toscaCsarArtifactOptl.isPresent()){
- extractedServiceArtifacts.add(toscaCsarArtifactOptl.get());
- }
-
- List<ArtifactInfoImpl> artifacts = ArtifactInfoImpl.convertServiceArtifactToArtifactInfoImpl(service, extractedServiceArtifacts);
- return artifacts;
- }
-
- private Optional<ArtifactDefinition> exrtactToscaTemplateArtifact(Service service) {
- return service.getToscaArtifacts().values().stream()
- //filters TOSCA_TEMPLATE artifact
- .filter(e -> e.getArtifactType().equals(ArtifactTypeEnum.TOSCA_TEMPLATE.getType())).findAny();
- }
-
- private Optional<ArtifactDefinition> exrtactToscaCsarArtifact(Service service) {
- return service.getToscaArtifacts().values().stream()
- //filters TOSCA_CSAR artifact
- .filter(e -> e.getArtifactType().equals(ArtifactTypeEnum.TOSCA_CSAR.getType())).findAny();
- }
-
- private List<JsonContainerResourceInstance> convertRIToJsonContanier(Service service) {
- List<JsonContainerResourceInstance> ret = new ArrayList<JsonContainerResourceInstance>();
- if (service.getComponentInstances() != null) {
- for (ComponentInstance resourceInstance : service.getComponentInstances()) {
- String resoucreType = resourceInstance.getOriginType().getValue();
- List<ArtifactDefinition> artifactsDefList = getArtifactsWithPayload(resourceInstance);
- List<ArtifactInfoImpl> artifacts = ArtifactInfoImpl.convertToArtifactInfoImpl(service, resourceInstance,
- artifactsDefList);
-
- String resourceInvariantUUID = null;
- String resourceCategory = null;
- String resourceSubcategory = null;
-
- ComponentParametersView componentParametersView = new ComponentParametersView();
- componentParametersView.disableAll();
- componentParametersView.setIgnoreCategories(false);
- Either<Resource, StorageOperationStatus> componentResponse = toscaOperationFacade
- .getToscaElement(resourceInstance.getComponentUid(), componentParametersView);
-
- if (componentResponse.isRight()) {
- logger.debug("Resource {} Invariant UUID & Categories retrieving failed", resourceInstance.getComponentUid());
- } else {
- Resource resource = componentResponse.left().value();
- resourceInvariantUUID = resource.getInvariantUUID();
-
- List<CategoryDefinition> categories = resource.getCategories();
-
- if (categories != null) {
- CategoryDefinition categoryDefinition = categories.get(0);
-
- if (categoryDefinition != null) {
- resourceCategory = categoryDefinition.getName();
- List<SubCategoryDefinition> subcategories = categoryDefinition.getSubcategories();
- if (null != subcategories) {
- SubCategoryDefinition subCategoryDefinition = subcategories.get(0);
-
- if (subCategoryDefinition != null) {
- resourceSubcategory = subCategoryDefinition.getName();
- }
- }
- }
- }
- }
-
- JsonContainerResourceInstance jsonContainer = new JsonContainerResourceInstance(resourceInstance, resoucreType,
- rebuildArtifactswith120TimeoutInsteadOf60(artifacts)/*TODO used to send artifacts, the function is a fix to the short timeout bug in distribution*/);
- jsonContainer.setResourceInvariantUUID(resourceInvariantUUID);
- jsonContainer.setCategory(resourceCategory);
- jsonContainer.setSubcategory(resourceSubcategory);
- ret.add(jsonContainer);
- }
- }
- return ret;
- }
-
- private List<ArtifactInfoImpl> rebuildArtifactswith120TimeoutInsteadOf60(List<ArtifactInfoImpl> artifacts) {
- for(ArtifactInfoImpl artifact : artifacts){
- if(artifact.getArtifactTimeout().equals(60)){
- artifact.setArtifactTimeout(120);
- }
- }
- return artifacts;
- }
-
- private List<ArtifactDefinition> getArtifactsWithPayload(ComponentInstance resourceInstance) {
- List<ArtifactDefinition> ret = new ArrayList<ArtifactDefinition>();
-
- // List<ArtifactDefinition> informationDeployedArtifacts =
- // informationDeployedArtifactsBusinessLogic.getInformationalDeployedArtifactsForResourceInstance(resourceInstance);
- List<ArtifactDefinition> deployableArtifacts = new ArrayList<ArtifactDefinition>();
- // deployableArtifacts.addAll(informationDeployedArtifacts);
- if (resourceInstance.getDeploymentArtifacts() != null) {
- deployableArtifacts.addAll(resourceInstance.getDeploymentArtifacts().values());
- }
-
- for (ArtifactDefinition artifactDef : deployableArtifacts) {
- if (artifactDef.checkEsIdExist()) {
- ret.add(artifactDef);
- }
- }
-
- return ret;
- }
-
- /**
- * build the url for resource intance artifact
- *
- * @param service
- * @param resourceData
- * @param artifactName
- * @return
- */
- public static String buildResourceInstanceArtifactUrl(Service service, ComponentInstance resourceInstance,
- String artifactName) {
-
- String url = String.format(RESOURCE_INSTANCE_ARTIFACT_URL, service.getSystemName(), service.getVersion(),
- resourceInstance.getNormalizedName(), artifactName);
-
- logger.debug("After building artifact url {}", url);
-
- return url;
- }
-
- /**
- * build the url for resource intance artifact
- *
- * @param service
- * @param resourceData
- * @param artifactName
- * @return
- */
- public static String buildServiceArtifactUrl(Service service, String artifactName) {
-
- String url = String.format(SERVICE_ARTIFACT_URL, service.getSystemName(), service.getVersion(), artifactName);
-
- logger.debug("After building artifact url {}", url);
-
- return url;
-
- }
-
- /**
- * Retrieve all deployment artifacts of all resources under a given service
- *
- * @param resourceArtifactsResult
- * @param service
- * @param deConfiguration
- * @return
- */
- public Either<Boolean, StorageOperationStatus> isServiceContainsDeploymentArtifacts(Service service) {
-
- Either<Boolean, StorageOperationStatus> result = Either.left(false);
- Map<String, ArtifactDefinition> serviseArtifactsMap = service.getDeploymentArtifacts();
- if (serviseArtifactsMap != null && !serviseArtifactsMap.isEmpty()) {
- result = Either.left(true);
- return result;
- }
-
- List<ComponentInstance> resourceInstances = service.getComponentInstances();
-
- if (resourceInstances != null) {
- for (ComponentInstance resourceInstance : resourceInstances) {
-
- Map<String, ArtifactDefinition> deploymentArtifactsMapper = resourceInstance.getDeploymentArtifacts();
- // List<ArtifactDefinition> informationDeployedArtifacts =
- // informationDeployedArtifactsBusinessLogic.getInformationalDeployedArtifactsForResourceInstance(resourceInstance);
-
- boolean isDeployableArtifactFound = isContainsPayload(deploymentArtifactsMapper.values());// ||
- // isContainsPayload(informationDeployedArtifacts);
- if (isDeployableArtifactFound) {
- result = Either.left(true);
- break;
- }
-
- }
-
- }
-
- return result;
- }
-
- private boolean isContainsPayload(Collection<ArtifactDefinition> collection) {
- boolean payLoadFound = collection != null && collection.stream().anyMatch(p -> p.checkEsIdExist());
- return payLoadFound;
- }
+ private static final Logger logger = LoggerFactory.getLogger(ServiceDistributionArtifactsBuilder.class);
+
+ static final String BASE_ARTIFACT_URL = "/sdc/v1/catalog/services/%s/%s/";
+ static final String RESOURCE_ARTIFACT_URL = BASE_ARTIFACT_URL + "resources/%s/%s/artifacts/%s";
+ static final String SERVICE_ARTIFACT_URL = BASE_ARTIFACT_URL + "artifacts/%s";
+ static final String RESOURCE_INSTANCE_ARTIFACT_URL = BASE_ARTIFACT_URL + "resourceInstances/%s/artifacts/%s";
+
+ @javax.annotation.Resource
+ InterfaceLifecycleOperation interfaceLifecycleOperation;
+
+ @javax.annotation.Resource
+ IArtifactOperation artifactOperation;
+
+ @Autowired
+ ToscaOperationFacade toscaOperationFacade;
+
+ public InterfaceLifecycleOperation getInterfaceLifecycleOperation() {
+ return interfaceLifecycleOperation;
+ }
+
+ public void setInterfaceLifecycleOperation(InterfaceLifecycleOperation interfaceLifecycleOperation) {
+ this.interfaceLifecycleOperation = interfaceLifecycleOperation;
+ }
+
+ private String resolveWorkloadContext(String workloadContext) {
+ return workloadContext != null ? workloadContext :
+ ConfigurationManager.getConfigurationManager().getConfiguration().getWorkloadContext();
+ }
+
+ public INotificationData buildResourceInstanceForDistribution(Service service, String distributionId, String workloadContext) {
+ INotificationData notificationData = new NotificationDataImpl();
+
+ notificationData.setResources(convertRIsToJsonContanier(service));
+ notificationData.setServiceName(service.getName());
+ notificationData.setServiceVersion(service.getVersion());
+ notificationData.setDistributionID(distributionId);
+ notificationData.setServiceUUID(service.getUUID());
+ notificationData.setServiceDescription(service.getDescription());
+ notificationData.setServiceInvariantUUID(service.getInvariantUUID());
+ workloadContext = resolveWorkloadContext(workloadContext);
+ if (workloadContext!=null){
+ notificationData.setWorkloadContext(workloadContext);
+ }
+ logger.debug("Before returning notification data object {}", notificationData);
+
+ return notificationData;
+ }
+
+ public INotificationData buildServiceForDistribution(INotificationData notificationData, Service service) {
+
+ notificationData.setServiceArtifacts(convertServiceArtifactsToArtifactInfo(service));
+
+ logger.debug("Before returning notification data object {}", notificationData);
+
+ return notificationData;
+ }
+
+ private List<ArtifactInfoImpl> convertServiceArtifactsToArtifactInfo(Service service) {
+
+ Map<String, ArtifactDefinition> serviceArtifactsMap = service.getDeploymentArtifacts();
+ List<ArtifactDefinition> extractedServiceArtifacts = serviceArtifactsMap.values().stream()
+ //filters all artifacts with existing EsId
+ .filter(ArtifactDefinition::checkEsIdExist)
+ //collects all filtered artifacts with existing EsId to List
+ .collect(Collectors.toList());
+
+ Optional<ArtifactDefinition> toscaTemplateArtifactOptl = exrtactToscaTemplateArtifact(service);
+ if(toscaTemplateArtifactOptl.isPresent()){
+ extractedServiceArtifacts.add(toscaTemplateArtifactOptl.get());
+ }
+
+ Optional<ArtifactDefinition> toscaCsarArtifactOptl = exrtactToscaCsarArtifact(service);
+ if(toscaCsarArtifactOptl.isPresent()){
+ extractedServiceArtifacts.add(toscaCsarArtifactOptl.get());
+ }
+
+ return ArtifactInfoImpl.convertServiceArtifactToArtifactInfoImpl(service, extractedServiceArtifacts);
+ }
+
+ private Optional<ArtifactDefinition> exrtactToscaTemplateArtifact(Service service) {
+ return service.getToscaArtifacts().values().stream()
+ //filters TOSCA_TEMPLATE artifact
+ .filter(e -> e.getArtifactType().equals(ArtifactTypeEnum.TOSCA_TEMPLATE.getType())).findAny();
+ }
+
+ private Optional<ArtifactDefinition> exrtactToscaCsarArtifact(Service service) {
+ return service.getToscaArtifacts().values().stream()
+ //filters TOSCA_CSAR artifact
+ .filter(e -> e.getArtifactType().equals(ArtifactTypeEnum.TOSCA_CSAR.getType())).findAny();
+ }
+
+ private List<JsonContainerResourceInstance> convertRIsToJsonContanier(Service service) {
+ List<JsonContainerResourceInstance> ret = new ArrayList<>();
+ if (service.getComponentInstances() != null) {
+ for (ComponentInstance instance : service.getComponentInstances()) {
+ JsonContainerResourceInstance jsonContainer = new JsonContainerResourceInstance(instance, convertToArtifactsInfoImpl(service, instance));
+ ComponentParametersView filter = new ComponentParametersView();
+ filter.disableAll();
+ filter.setIgnoreCategories(false);
+ toscaOperationFacade.getToscaElement(instance.getComponentUid(), filter)
+ .left()
+ .bind(r->{fillJsonContainer(jsonContainer, (Resource) r); return Either.left(r);})
+ .right()
+ .forEach(r->logger.debug("Resource {} Invariant UUID & Categories retrieving failed", instance.getComponentUid()));
+ ret.add(jsonContainer);
+ }
+ }
+ return ret;
+ }
+
+ private void fillJsonContainer(JsonContainerResourceInstance jsonContainer, Resource resource) {
+ jsonContainer.setResourceInvariantUUID(resource.getInvariantUUID());
+ setCategories(jsonContainer, resource.getCategories());
+ }
+
+ private List<ArtifactInfoImpl> convertToArtifactsInfoImpl(Service service, ComponentInstance resourceInstance) {
+ List<ArtifactInfoImpl> artifacts = ArtifactInfoImpl.convertToArtifactInfoImpl(service, resourceInstance, getArtifactsWithPayload(resourceInstance));
+ artifacts.stream().forEach(ArtifactInfoImpl::updateArtifactTimeout);
+ return artifacts;
+ }
+
+ private void setCategories(JsonContainerResourceInstance jsonContainer, List<CategoryDefinition> categories) {
+ if (categories != null) {
+ CategoryDefinition categoryDefinition = categories.get(0);
+
+ if (categoryDefinition != null) {
+ jsonContainer.setCategory(categoryDefinition.getName());
+ List<SubCategoryDefinition> subcategories = categoryDefinition.getSubcategories();
+ if (null != subcategories) {
+ SubCategoryDefinition subCategoryDefinition = subcategories.get(0);
+
+ if (subCategoryDefinition != null) {
+ jsonContainer.setSubcategory(subCategoryDefinition.getName());
+ }
+ }
+ }
+ }
+ }
+
+ private List<ArtifactDefinition> getArtifactsWithPayload(ComponentInstance resourceInstance) {
+ List<ArtifactDefinition> ret = new ArrayList<>();
+
+ List<ArtifactDefinition> deployableArtifacts = new ArrayList<>();
+ if (resourceInstance.getDeploymentArtifacts() != null) {
+ deployableArtifacts.addAll(resourceInstance.getDeploymentArtifacts().values());
+ }
+
+ for (ArtifactDefinition artifactDef : deployableArtifacts) {
+ if (artifactDef.checkEsIdExist()) {
+ ret.add(artifactDef);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * build the URL for resource instance artifact
+ *
+ * @param service
+ * @param resourceInstance
+ * @param artifactName
+ * @return URL string
+ */
+ public static String buildResourceInstanceArtifactUrl(Service service, ComponentInstance resourceInstance,
+ String artifactName) {
+
+ String url = String.format(RESOURCE_INSTANCE_ARTIFACT_URL, service.getSystemName(), service.getVersion(),
+ resourceInstance.getNormalizedName(), artifactName);
+
+ logger.debug("After building artifact url {}", url);
+
+ return url;
+ }
+
+ /**
+ * build the URL for resource instance artifact
+ *
+ * @param service
+ * @param artifactName
+ * @return URL string
+ */
+ public static String buildServiceArtifactUrl(Service service, String artifactName) {
+
+ String url = String.format(SERVICE_ARTIFACT_URL, service.getSystemName(), service.getVersion(), artifactName);
+
+ logger.debug("After building artifact url {}", url);
+
+ return url;
+
+ }
+
+ /**
+ * Verifies that the service or at least one of its instance contains deployment artifacts
+ *
+ * @param the service
+ * @return boolean
+ */
+ public boolean verifyServiceContainsDeploymentArtifacts(Service service) {
+ if (MapUtils.isNotEmpty(service.getDeploymentArtifacts())) {
+ return true;
+ }
+ boolean contains = false;
+ List<ComponentInstance> resourceInstances = service.getComponentInstances();
+ if (CollectionUtils.isNotEmpty(resourceInstances)) {
+ contains = resourceInstances.stream().anyMatch(i -> isContainsPayload(i.getDeploymentArtifacts()));
+ }
+ return contains;
+ }
+
+ private boolean isContainsPayload(Map<String, ArtifactDefinition> deploymentArtifacts) {
+ return deploymentArtifacts != null && deploymentArtifacts.values().stream().anyMatch(ArtifactDefinition::checkEsIdExist);
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/SubscriberTypeEnum.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/SubscriberTypeEnum.java
index f8c0e3f593..3477648dbd 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/SubscriberTypeEnum.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/SubscriberTypeEnum.java
@@ -22,5 +22,5 @@ package org.openecomp.sdc.be.components.distribution.engine;
public enum SubscriberTypeEnum {
- CONSUMER, PRODUCER;
+ CONSUMER, PRODUCER;
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/UebHealthCheckCall.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/UebHealthCheckCall.java
index c522ca91b5..7a25c2ed69 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/UebHealthCheckCall.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/UebHealthCheckCall.java
@@ -20,58 +20,58 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.concurrent.Callable;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.Callable;
+
public class UebHealthCheckCall implements Callable<Boolean> {
- CambriaHandler cambriaHandler = new CambriaHandler();
+ CambriaHandler cambriaHandler = new CambriaHandler();
- String server;
- String publicApiKey;
+ String server;
+ String publicApiKey;
- private static Logger healthLogger = LoggerFactory.getLogger(DistributionEngineClusterHealth.UEB_HEALTH_LOG_CONTEXT);
+ private static final Logger healthLogger = LoggerFactory.getLogger(DistributionEngineClusterHealth.UEB_HEALTH_LOG_CONTEXT);
- private static Logger logger = LoggerFactory.getLogger(UebHealthCheckCall.class.getName());
+ private static final Logger logger = LoggerFactory.getLogger(UebHealthCheckCall.class);
- public UebHealthCheckCall(String server, String publicApiKey) {
- super();
- this.server = server;
- this.publicApiKey = publicApiKey;
- }
+ public UebHealthCheckCall(String server, String publicApiKey) {
+ super();
+ this.server = server;
+ this.publicApiKey = publicApiKey;
+ }
- @Override
- public Boolean call() {
+ @Override
+ public Boolean call() {
- healthLogger.trace("Going to run health check towards ueb server {}", server);
+ healthLogger.trace("Going to run health check towards ueb server {}", server);
- boolean result = false;
- CambriaErrorResponse cambriaErrorResponse = cambriaHandler.getApiKey(server, publicApiKey);
+ boolean result = false;
+ CambriaErrorResponse cambriaErrorResponse = cambriaHandler.getApiKey(server, publicApiKey);
- logger.debug("After running Health check towards ueb server {}. Result is {}", server, cambriaErrorResponse);
+ logger.debug("After running Health check towards ueb server {}. Result is {}", server, cambriaErrorResponse);
- if (cambriaErrorResponse.httpCode < CambriaErrorResponse.HTTP_INTERNAL_SERVER_ERROR) {
- logger.debug("After running Health check towards ueb server {}. Error code is {}. Set result to true", server, cambriaErrorResponse.httpCode);
- result = true;
- }
+ if (cambriaErrorResponse.httpCode < CambriaErrorResponse.HTTP_INTERNAL_SERVER_ERROR) {
+ logger.debug("After running Health check towards ueb server {}. Error code is {}. Set result to true", server, cambriaErrorResponse.httpCode);
+ result = true;
+ }
- healthLogger.trace("Result after running health check towards ueb server {} is {}. Returned result is {} ", server, cambriaErrorResponse, result);
+ healthLogger.trace("Result after running health check towards ueb server {} is {}. Returned result is {} ", server, cambriaErrorResponse, result);
- return result;
- }
+ return result;
+ }
- public String getServer() {
- return server;
- }
+ public String getServer() {
+ return server;
+ }
- public CambriaHandler getCambriaHandler() {
- return cambriaHandler;
- }
+ public CambriaHandler getCambriaHandler() {
+ return cambriaHandler;
+ }
- public void setCambriaHandler(CambriaHandler cambriaHandler) {
- this.cambriaHandler = cambriaHandler;
- }
+ public void setCambriaHandler(CambriaHandler cambriaHandler) {
+ this.cambriaHandler = cambriaHandler;
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/VfModuleArtifactPayload.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/VfModuleArtifactPayload.java
index d706e40f5c..8f0865f2ac 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/VfModuleArtifactPayload.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/VfModuleArtifactPayload.java
@@ -20,105 +20,100 @@
package org.openecomp.sdc.be.components.distribution.engine;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
import org.openecomp.sdc.be.model.GroupDefinition;
import org.openecomp.sdc.be.model.GroupInstance;
import org.openecomp.sdc.be.model.GroupInstanceProperty;
import org.openecomp.sdc.be.model.GroupProperty;
import org.openecomp.sdc.common.api.Constants;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
public class VfModuleArtifactPayload {
-
- private String vfModuleModelName, vfModuleModelInvariantUUID, vfModuleModelVersion, vfModuleModelUUID, vfModuleModelCustomizationUUID, vfModuleModelDescription;
- private Boolean isBase;
- private List<String> artifacts;
- private Map< String, Object> properties;
-
- public VfModuleArtifactPayload(GroupDefinition group) {
- vfModuleModelName = group.getName();
- vfModuleModelInvariantUUID = group.getInvariantUUID();
- vfModuleModelVersion = group.getVersion();
- vfModuleModelUUID = group.getGroupUUID();
- vfModuleModelDescription = group.getDescription();
-
- artifacts = group.getArtifactsUuid();
- // Base Value is set from properties
- setBaseValue(group);
-
- }
-
- public VfModuleArtifactPayload(GroupInstance group) {
- vfModuleModelName = group.getGroupName();
- vfModuleModelInvariantUUID = group.getInvariantUUID();
- vfModuleModelVersion = group.getVersion();
- vfModuleModelUUID = group.getGroupUUID();
- vfModuleModelCustomizationUUID = group.getCustomizationUUID();
- vfModuleModelDescription = group.getDescription();
-
- artifacts = group.getArtifactsUuid();
- artifacts.addAll(group.getGroupInstanceArtifactsUuid());
-
- // Base Value is set from properties
- setBaseValue(group);
-
- if(group.convertToGroupInstancesProperties() != null)
- setProperties(group.convertToGroupInstancesProperties());
- //converts List of GroupInstanceProperties to Map propertyName : GroupInstanceProperty ()
- //setProperties(group.getGroupInstancesProperties().stream().collect(Collectors.toMap(p->p.getName(), p->p)));
-
- }
-
- private void setBaseValue(GroupInstance group) {
- if (group.convertToGroupInstancesProperties() != null) {
- Optional<GroupInstanceProperty> findBaseProperty = group.convertToGroupInstancesProperties().stream().filter(p -> p.getName().equals(Constants.IS_BASE)).findAny();
- if (findBaseProperty.isPresent()) {
- isBase = Boolean.valueOf(findBaseProperty.get().getValue());
- }
-
- }
- }
- private void setBaseValue(GroupDefinition group) {
- if (group.getProperties() != null) {
- Optional<GroupProperty> findBaseProperty = group.convertToGroupProperties().stream().filter(p -> p.getName().equals(Constants.IS_BASE)).findAny();
- if (findBaseProperty.isPresent()) {
- isBase = Boolean.valueOf(findBaseProperty.get().getValue());
- }
-
- }
- }
-
-
-
- public List<String> getArtifacts() {
- return artifacts;
- }
-
- public void setArtifacts(List<String> artifacts) {
- this.artifacts = artifacts;
- }
-
-
-
- public Map<String, Object> getProperties() {
- return properties;
- }
-
- /*public void setProperties(Map<String, Object> properties) {
- this.properties = properties;
- }*/
-
- public void setProperties(List<GroupInstanceProperty> properties) {
- this.properties = properties.stream().filter(p -> !p.getName().equals(Constants.IS_BASE)).collect(
- Collectors.toMap(x -> x.getName(), x -> x.getValue() == null? "":x.getValue() ));
- }
-
- public static int compareByGroupName(VfModuleArtifactPayload art1, VfModuleArtifactPayload art2) {
- Float thisCounter = Float.parseFloat(art1.vfModuleModelName.split(Constants.MODULE_NAME_DELIMITER)[1].replace(' ', '.'));
- Float otherCounter = Float.parseFloat(art2.vfModuleModelName.split(Constants.MODULE_NAME_DELIMITER)[1].replace(' ', '.'));
- return thisCounter.compareTo(otherCounter);
- }
+
+ private String vfModuleModelName, vfModuleModelInvariantUUID, vfModuleModelVersion, vfModuleModelUUID, vfModuleModelCustomizationUUID, vfModuleModelDescription;
+ private Boolean isBase;
+ private List<String> artifacts;
+ private Map< String, Object> properties;
+
+ public VfModuleArtifactPayload(GroupDefinition group) {
+ vfModuleModelName = group.getName();
+ vfModuleModelInvariantUUID = group.getInvariantUUID();
+ vfModuleModelVersion = group.getVersion();
+ vfModuleModelUUID = group.getGroupUUID();
+ vfModuleModelDescription = group.getDescription();
+
+ artifacts = group.getArtifactsUuid();
+ // Base Value is set from properties
+ setBaseValue(group);
+
+ }
+
+ public VfModuleArtifactPayload(GroupInstance group) {
+ vfModuleModelName = group.getGroupName();
+ vfModuleModelInvariantUUID = group.getInvariantUUID();
+ vfModuleModelVersion = group.getVersion();
+ vfModuleModelUUID = group.getGroupUUID();
+ vfModuleModelCustomizationUUID = group.getCustomizationUUID();
+ vfModuleModelDescription = group.getDescription();
+
+ artifacts = new ArrayList<>(group.getArtifactsUuid());
+ artifacts.addAll(group.getGroupInstanceArtifactsUuid());
+
+ // Base Value is set from properties
+ setBaseValue(group);
+
+ if(group.convertToGroupInstancesProperties() != null)
+ setProperties(group.convertToGroupInstancesProperties());
+ }
+
+ private void setBaseValue(GroupInstance group) {
+ if (group.convertToGroupInstancesProperties() != null) {
+ Optional<GroupInstanceProperty> findBaseProperty = group.convertToGroupInstancesProperties().stream().filter(p -> p.getName().equals(Constants.IS_BASE)).findAny();
+ if (findBaseProperty.isPresent()) {
+ isBase = Boolean.valueOf(findBaseProperty.get().getValue());
+ }
+
+ }
+ }
+ private void setBaseValue(GroupDefinition group) {
+ if (group.getProperties() != null) {
+ Optional<GroupProperty> findBaseProperty = group.convertToGroupProperties().stream().filter(p -> p.getName().equals(Constants.IS_BASE)).findAny();
+ if (findBaseProperty.isPresent()) {
+ isBase = Boolean.valueOf(findBaseProperty.get().getValue());
+ }
+
+ }
+ }
+
+
+
+ public List<String> getArtifacts() {
+ return artifacts;
+ }
+
+ public void setArtifacts(List<String> artifacts) {
+ this.artifacts = artifacts;
+ }
+
+
+
+ public Map<String, Object> getProperties() {
+ return properties;
+ }
+
+
+ public void setProperties(List<GroupInstanceProperty> properties) {
+ this.properties = properties.stream().filter(p -> !p.getName().equals(Constants.IS_BASE)).collect(
+ Collectors.toMap(x -> x.getName(), x -> x.getValue() == null? "":x.getValue() ));
+ }
+
+ public static int compareByGroupName(VfModuleArtifactPayload art1, VfModuleArtifactPayload art2) {
+ Float thisCounter = Float.parseFloat(art1.vfModuleModelName.split(Constants.MODULE_NAME_DELIMITER)[1].replace(' ', '.'));
+ Float otherCounter = Float.parseFloat(art2.vfModuleModelName.split(Constants.MODULE_NAME_DELIMITER)[1].replace(' ', '.'));
+ return thisCounter.compareTo(otherCounter);
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/config/DistributionEngineSpringConfig.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/config/DistributionEngineSpringConfig.java
new file mode 100644
index 0000000000..e6f2cc634f
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/config/DistributionEngineSpringConfig.java
@@ -0,0 +1,11 @@
+package org.openecomp.sdc.be.components.distribution.engine.config;
+
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ComponentScan({"org.openecomp.sdc.be.components.distribution.engine",
+ })
+public class DistributionEngineSpringConfig {
+
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/DistributionCompleteReporter.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/DistributionCompleteReporter.java
new file mode 100644
index 0000000000..7f9dd45e67
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/DistributionCompleteReporter.java
@@ -0,0 +1,9 @@
+package org.openecomp.sdc.be.components.distribution.engine.report;
+
+import org.openecomp.sdc.be.components.distribution.engine.DistributionStatusNotification;
+
+public interface DistributionCompleteReporter {
+
+ void reportDistributionComplete(DistributionStatusNotification distributionStatusNotification);
+
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/MsoDistributionCompleteReporter.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/MsoDistributionCompleteReporter.java
new file mode 100644
index 0000000000..4e06b0ed0e
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/MsoDistributionCompleteReporter.java
@@ -0,0 +1,19 @@
+package org.openecomp.sdc.be.components.distribution.engine.report;
+
+import org.openecomp.sdc.be.components.distribution.engine.DistributionStatusNotification;
+import org.openecomp.sdc.be.components.distribution.engine.rest.MSORestClient;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+@Component
+public class MsoDistributionCompleteReporter implements DistributionCompleteReporter {
+
+ @Resource
+ private MSORestClient msoClient;
+
+ @Override
+ public void reportDistributionComplete(DistributionStatusNotification distributionStatusNotification) {
+ msoClient.notifyDistributionComplete(distributionStatusNotification.getDistributionID(), distributionStatusNotification.getStatus(), distributionStatusNotification.getErrorReason());
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/DistributionStatusRequest.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/DistributionStatusRequest.java
new file mode 100644
index 0000000000..45727fb28f
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/DistributionStatusRequest.java
@@ -0,0 +1,23 @@
+package org.openecomp.sdc.be.components.distribution.engine.rest;
+
+/**
+ * a class which represents an MSO distribution status rest request body
+ */
+public class DistributionStatusRequest {
+
+ private String status;
+ private String errorReason;
+
+ public DistributionStatusRequest(String status, String errorReason) {
+ this.status = status;
+ this.errorReason = errorReason;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public String getErrorReason() {
+ return errorReason;
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/MSORestClient.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/MSORestClient.java
new file mode 100644
index 0000000000..76263f942d
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/MSORestClient.java
@@ -0,0 +1,69 @@
+package org.openecomp.sdc.be.components.distribution.engine.rest;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.eclipse.jetty.util.URIUtil;
+import org.openecomp.sdc.be.components.distribution.engine.DistributionStatusNotificationEnum;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.common.http.client.api.*;
+import org.openecomp.sdc.common.http.config.BasicAuthorization;
+import org.openecomp.sdc.common.http.config.ExternalServiceConfig;
+import org.openecomp.sdc.common.http.config.HttpClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.Properties;
+
+@Component
+public class MSORestClient {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MSORestClient.class);
+ private static final Gson gson = new Gson();
+ @VisibleForTesting
+ static final String DISTRIBUTIONS_RESOURCE_CONFIG_PARAM = "distributions";
+
+ private ExternalServiceConfig serviceConfig = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getMsoConfig();
+
+ public MSORestClient() {
+ HttpClientConfig httpClientConfig = serviceConfig.getHttpClientConfig();
+ int numOfRetries = httpClientConfig.getNumOfRetries();
+ if ( numOfRetries > 0 ) {
+ httpClientConfig.setRetryHandler(RetryHandlers.getDefault(numOfRetries));
+ }
+ }
+
+ public HttpResponse<String> notifyDistributionComplete(String distributionId, DistributionStatusNotificationEnum distributionStatusEnum, String errReason) {
+ try {
+ return doNotifyDistributionComplete(distributionId, distributionStatusEnum, errReason);
+ }
+ catch(HttpExecuteException e) {
+ LOGGER.debug("The request to mso failed with exception ", e);
+ return Responses.INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ private HttpResponse<String> doNotifyDistributionComplete(String distributionId, DistributionStatusNotificationEnum distributionStatusEnum, String errReason) throws HttpExecuteException {
+ StringEntity entity = new StringEntity(gson.toJson(new DistributionStatusRequest(distributionStatusEnum.name(), errReason)), ContentType.APPLICATION_JSON);
+ HttpResponse<String> response = HttpRequest.patch(buildMsoDistributionUrl(distributionId), buildReqHeader(), entity, serviceConfig.getHttpClientConfig());
+ LOGGER.info("response from mso - status code: {}, status description: {}, response: {}, ", response.getStatusCode(), response.getDescription(), response.getResponse());
+ return response;
+ }
+
+ private Properties buildReqHeader() {
+ Properties properties = new Properties();
+ BasicAuthorization basicAuth = serviceConfig.getHttpClientConfig().getBasicAuthorization();
+ RestUtils.addBasicAuthHeader(properties, basicAuth.getUserName(), basicAuth.getPassword());
+ return properties;
+ }
+
+ private String buildMsoDistributionUrl(String distributionId) {
+ String msoBaseUrl = serviceConfig.getHttpRequestConfig().getServerRootUrl();
+ String distributionsPath = serviceConfig.getHttpRequestConfig().getResourceNamespaces().get(DISTRIBUTIONS_RESOURCE_CONFIG_PARAM);
+ String distributionsApiPath = distributionsPath + URIUtil.SLASH + distributionId;
+ return msoBaseUrl + distributionsApiPath;
+ }
+
+}