diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution')
42 files changed, 4119 insertions, 2965 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/AaiRequestHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/AaiRequestHandler.java new file mode 100644 index 0000000000..0519f435e3 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/AaiRequestHandler.java @@ -0,0 +1,84 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +import org.apache.http.conn.ConnectTimeoutException; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.common.api.Constants; +import org.openecomp.sdc.common.datastructure.FunctionalInterfaces; +import org.openecomp.sdc.common.datastructure.FunctionalInterfaces.SupplierThrows; +import org.openecomp.sdc.common.http.client.api.HttpExecuteException; +import org.openecomp.sdc.common.http.client.api.HttpRequest; +import org.openecomp.sdc.common.http.client.api.HttpResponse; +import org.openecomp.sdc.common.http.client.api.Responses; +import org.openecomp.sdc.common.http.config.ExternalServiceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.util.Properties; +import java.util.UUID; + +@Component +public class AaiRequestHandler { + + private static final Logger logger = LoggerFactory.getLogger(AaiRequestHandler.class); + private ExternalServiceConfig aaiConfig; + + protected static final String OPERATIONAL_ENV_RESOURCE_CONFIG_PARAM = "operationalEnvironments"; + protected static final String OPERATIONAL_ENV_RESOURCE = "/operational-environment"; + + @PostConstruct + public void init() { + logger.debug("AaiRequestHandler has been initialized."); + + aaiConfig = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getAaiConfig(); + logger.debug("AaiRequestHandler Configuration={}", aaiConfig); + } + + + public HttpResponse<String> getOperationalEnvById(String id) { + Properties headers = createHeaders(); + String url = String.format("%s%s%s/%s", + aaiConfig.getHttpRequestConfig().getServerRootUrl(), + aaiConfig.getHttpRequestConfig().getResourceNamespaces().get(OPERATIONAL_ENV_RESOURCE_CONFIG_PARAM), + OPERATIONAL_ENV_RESOURCE, id); + + SupplierThrows<HttpResponse<String>, Exception> httpGet = () -> HttpRequest.get(url, headers, aaiConfig.getHttpClientConfig()); + long maxRetries = aaiConfig.getHttpClientConfig().getNumOfRetries(); + try { + return FunctionalInterfaces.retryMethodOnException(httpGet, this::retryOnException, maxRetries); + } + catch (Exception e) { + logger.debug("Request failed with exception {}", getCause(e).getMessage()); + return Responses.INTERNAL_SERVER_ERROR; + } + } + + + private boolean retryOnException(Exception e) { + Throwable cause = getCause(e); + return !(cause instanceof ConnectTimeoutException || cause instanceof ConnectException || cause instanceof SocketTimeoutException); + } + + + private Throwable getCause(Exception e) { + if (e instanceof HttpExecuteException) { + return e.getCause(); + } + return e; + } + + + private Properties createHeaders() { + Properties headers = new Properties(); + headers.put(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON); + headers.put(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON); + headers.put(Constants.X_TRANSACTION_ID_HEADER, UUID.randomUUID().toString()); + + return headers; + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ArtifactInfoImpl.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ArtifactInfoImpl.java index e1a1270fe3..9c46c68cd8 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ArtifactInfoImpl.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ArtifactInfoImpl.java @@ -20,177 +20,185 @@ package org.openecomp.sdc.be.components.distribution.engine; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.model.ArtifactDefinition; +import org.openecomp.sdc.be.model.ComponentInstance; +import org.openecomp.sdc.be.model.Service; +import org.openecomp.sdc.common.api.ArtifactTypeEnum; + import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.openecomp.sdc.be.model.ArtifactDefinition; -import org.openecomp.sdc.be.model.ComponentInstance; -import org.openecomp.sdc.be.model.Service; -import org.openecomp.sdc.common.api.ArtifactTypeEnum; - public class ArtifactInfoImpl implements IArtifactInfo { - private String artifactName; - private ArtifactTypeEnum artifactType; - private String artifactURL; - private String artifactChecksum; - private String artifactDescription; - private Integer artifactTimeout; - private String artifactUUID; - private String artifactVersion; - private String generatedFromUUID; - private List<String> relatedArtifacts; - - public ArtifactInfoImpl() { - } - - private ArtifactInfoImpl(ArtifactDefinition artifactDef, String generatedFromUUID, List<String> relatedArtifacts) { - artifactName = artifactDef.getArtifactName(); - artifactType = ArtifactTypeEnum.findType(artifactDef.getArtifactType()); - artifactChecksum = artifactDef.getArtifactChecksum(); - artifactDescription = artifactDef.getDescription(); - artifactTimeout = artifactDef.getTimeout(); - artifactUUID = artifactDef.getArtifactUUID(); - artifactVersion = artifactDef.getArtifactVersion(); - this.relatedArtifacts = relatedArtifacts; - this.generatedFromUUID = generatedFromUUID; - } - - public static List<ArtifactInfoImpl> convertToArtifactInfoImpl(Service service, ComponentInstance resourceInstance, Collection<ArtifactDefinition> list) { - List<ArtifactInfoImpl> ret = new ArrayList<ArtifactInfoImpl>(); - Map<String, List<ArtifactDefinition>> artifactIdToDef = list.stream().collect(Collectors.groupingBy(ArtifactDefinition::getUniqueId)); - if (list != null) { - for (ArtifactDefinition artifactDef : list) { - String generatedFromUUID = null; - if (artifactDef.getGeneratedFromId() != null && !artifactDef.getGeneratedFromId().isEmpty()) { - ArtifactDefinition artifactFrom = artifactIdToDef.get(artifactDef.getGeneratedFromId()).get(0); - generatedFromUUID = artifactFrom.getArtifactUUID(); - } - ArtifactInfoImpl artifactInfoImpl = new ArtifactInfoImpl(artifactDef, generatedFromUUID, getUpdatedRequiredArtifactsFromNamesToUuids(artifactDef, resourceInstance.getDeploymentArtifacts())); - String artifactURL = ServiceDistributionArtifactsBuilder.buildResourceInstanceArtifactUrl(service, resourceInstance, artifactDef.getArtifactName()); - artifactInfoImpl.setArtifactURL(artifactURL); - ret.add(artifactInfoImpl); - } - } - return ret; - - } - - public static List<ArtifactInfoImpl> convertServiceArtifactToArtifactInfoImpl(Service service, Collection<ArtifactDefinition> list) { - List<ArtifactInfoImpl> ret = new ArrayList<ArtifactInfoImpl>(); - Map<String, List<ArtifactDefinition>> artifactIdToDef = list.stream().collect(Collectors.groupingBy(ArtifactDefinition::getUniqueId)); - if (list != null) { - for (ArtifactDefinition artifactDef : list) { - String generatedFromUUID = null; - if (artifactDef.getGeneratedFromId() != null && !artifactDef.getGeneratedFromId().isEmpty()) { - ArtifactDefinition artifactFrom = artifactIdToDef.get(artifactDef.getGeneratedFromId()).get(0); - generatedFromUUID = artifactFrom.getArtifactUUID(); - } - ArtifactInfoImpl artifactInfoImpl = new ArtifactInfoImpl(artifactDef, generatedFromUUID, getUpdatedRequiredArtifactsFromNamesToUuids(artifactDef, service.getDeploymentArtifacts())); - String artifactURL = ServiceDistributionArtifactsBuilder.buildServiceArtifactUrl(service, artifactDef.getArtifactName()); - artifactInfoImpl.setArtifactURL(artifactURL); - ret.add(artifactInfoImpl); - } - } - return ret; - - } - - private static List<String> getUpdatedRequiredArtifactsFromNamesToUuids(ArtifactDefinition artifactDefinition, Map<String, ArtifactDefinition> artifacts) { - List<String> requiredArtifacts = null; - if (artifactDefinition != null && artifactDefinition.getRequiredArtifacts() != null && !artifactDefinition.getRequiredArtifacts().isEmpty() && artifacts != null && !artifacts.isEmpty()) { - requiredArtifacts = artifacts.values().stream().filter(art -> artifactDefinition.getRequiredArtifacts().contains(art.getArtifactName())).map(art -> art.getArtifactUUID()).collect(Collectors.toList()); - } - return requiredArtifacts; - } - - public String getArtifactName() { - return artifactName; - } - - public void setArtifactName(String artifactName) { - this.artifactName = artifactName; - } - - public ArtifactTypeEnum getArtifactType() { - return artifactType; - } - - public void setArtifactType(ArtifactTypeEnum artifactType) { - this.artifactType = artifactType; - } - - public String getArtifactURL() { - return artifactURL; - } - - public void setArtifactURL(String artifactURL) { - this.artifactURL = artifactURL; - } - - public String getArtifactChecksum() { - return artifactChecksum; - } - - public void setArtifactChecksum(String artifactChecksum) { - this.artifactChecksum = artifactChecksum; - } - - public String getArtifactDescription() { - return artifactDescription; - } - - public void setArtifactDescription(String artifactDescription) { - this.artifactDescription = artifactDescription; - } - - public Integer getArtifactTimeout() { - return artifactTimeout; - } - - public void setArtifactTimeout(Integer artifactTimeout) { - this.artifactTimeout = artifactTimeout; - } - - public List<String> getRelatedArtifacts() { - return relatedArtifacts; - } - - public void setRelatedArtifacts(List<String> relatedArtifacts) { - this.relatedArtifacts = relatedArtifacts; - } - - @Override - public String toString() { - return "ArtifactInfoImpl [artifactName=" + artifactName + ", artifactType=" + artifactType + ", artifactURL=" + artifactURL + ", artifactChecksum=" + artifactChecksum + ", artifactDescription=" + artifactDescription + ", artifactTimeout=" - + artifactTimeout + ", artifactUUID=" + artifactUUID + ", artifactVersion=" + artifactVersion + ", generatedFromUUID=" + generatedFromUUID + ", relatedArtifacts=" + relatedArtifacts + "]"; - } - - public String getArtifactUUID() { - return artifactUUID; - } - - public void setArtifactUUID(String artifactUUID) { - this.artifactUUID = artifactUUID; - } - - public String getArtifactVersion() { - return artifactVersion; - } - - public void setArtifactVersion(String artifactVersion) { - this.artifactVersion = artifactVersion; - } - - public String getGeneratedFromUUID() { - return generatedFromUUID; - } - - public void setGeneratedFromUUID(String generatedFromUUID) { - this.generatedFromUUID = generatedFromUUID; - } + private String artifactName; + private ArtifactTypeEnum artifactType; + private String artifactURL; + private String artifactChecksum; + private String artifactDescription; + private Integer artifactTimeout; + private String artifactUUID; + private String artifactVersion; + private String generatedFromUUID; + private List<String> relatedArtifacts; + + public ArtifactInfoImpl() { + } + + private ArtifactInfoImpl(ArtifactDefinition artifactDef, String generatedFromUUID, List<String> relatedArtifacts) { + artifactName = artifactDef.getArtifactName(); + artifactType = ArtifactTypeEnum.findType(artifactDef.getArtifactType()); + artifactChecksum = artifactDef.getArtifactChecksum(); + artifactDescription = artifactDef.getDescription(); + artifactTimeout = artifactDef.getTimeout(); + artifactUUID = artifactDef.getArtifactUUID(); + artifactVersion = artifactDef.getArtifactVersion(); + this.relatedArtifacts = relatedArtifacts; + this.generatedFromUUID = generatedFromUUID; + } + + public static List<ArtifactInfoImpl> convertToArtifactInfoImpl(Service service, ComponentInstance resourceInstance, Collection<ArtifactDefinition> list) { + List<ArtifactInfoImpl> ret = new ArrayList<ArtifactInfoImpl>(); + Map<String, List<ArtifactDefinition>> artifactIdToDef = list.stream().collect(Collectors.groupingBy(ArtifactDefinition::getUniqueId)); + if (list != null) { + for (ArtifactDefinition artifactDef : list) { + String generatedFromUUID = null; + if (artifactDef.getGeneratedFromId() != null && !artifactDef.getGeneratedFromId().isEmpty()) { + ArtifactDefinition artifactFrom = artifactIdToDef.get(artifactDef.getGeneratedFromId()).get(0); + generatedFromUUID = artifactFrom.getArtifactUUID(); + } + ArtifactInfoImpl artifactInfoImpl = new ArtifactInfoImpl(artifactDef, generatedFromUUID, getUpdatedRequiredArtifactsFromNamesToUuids(artifactDef, resourceInstance.getDeploymentArtifacts())); + String artifactURL = ServiceDistributionArtifactsBuilder.buildResourceInstanceArtifactUrl(service, resourceInstance, artifactDef.getArtifactName()); + artifactInfoImpl.setArtifactURL(artifactURL); + ret.add(artifactInfoImpl); + } + } + ret.stream().forEach(ArtifactInfoImpl::updateArtifactTimeout); + return ret; + + } + + public static List<ArtifactInfoImpl> convertServiceArtifactToArtifactInfoImpl(Service service, Collection<ArtifactDefinition> list) { + List<ArtifactInfoImpl> ret = new ArrayList<ArtifactInfoImpl>(); + Map<String, List<ArtifactDefinition>> artifactIdToDef = list.stream().collect(Collectors.groupingBy(ArtifactDefinition::getUniqueId)); + if (list != null) { + for (ArtifactDefinition artifactDef : list) { + String generatedFromUUID = null; + if (artifactDef.getGeneratedFromId() != null && !artifactDef.getGeneratedFromId().isEmpty()) { + ArtifactDefinition artifactFrom = artifactIdToDef.get(artifactDef.getGeneratedFromId()).get(0); + generatedFromUUID = artifactFrom.getArtifactUUID(); + } + ArtifactInfoImpl artifactInfoImpl = new ArtifactInfoImpl(artifactDef, generatedFromUUID, getUpdatedRequiredArtifactsFromNamesToUuids(artifactDef, service.getDeploymentArtifacts())); + String artifactURL = ServiceDistributionArtifactsBuilder.buildServiceArtifactUrl(service, artifactDef.getArtifactName()); + artifactInfoImpl.setArtifactURL(artifactURL); + ret.add(artifactInfoImpl); + } + } + return ret; + + } + + private static List<String> getUpdatedRequiredArtifactsFromNamesToUuids(ArtifactDefinition artifactDefinition, Map<String, ArtifactDefinition> artifacts) { + List<String> requiredArtifacts = null; + if (artifactDefinition != null && artifactDefinition.getRequiredArtifacts() != null && !artifactDefinition.getRequiredArtifacts().isEmpty() && artifacts != null && !artifacts.isEmpty()) { + requiredArtifacts = artifacts.values().stream().filter(art -> artifactDefinition.getRequiredArtifacts().contains(art.getArtifactName())).map(art -> art.getArtifactUUID()).collect(Collectors.toList()); + } + return requiredArtifacts; + } + + public String getArtifactName() { + return artifactName; + } + + public void setArtifactName(String artifactName) { + this.artifactName = artifactName; + } + + public ArtifactTypeEnum getArtifactType() { + return artifactType; + } + + public void setArtifactType(ArtifactTypeEnum artifactType) { + this.artifactType = artifactType; + } + + public String getArtifactURL() { + return artifactURL; + } + + public void setArtifactURL(String artifactURL) { + this.artifactURL = artifactURL; + } + + public String getArtifactChecksum() { + return artifactChecksum; + } + + public void setArtifactChecksum(String artifactChecksum) { + this.artifactChecksum = artifactChecksum; + } + + public String getArtifactDescription() { + return artifactDescription; + } + + public void setArtifactDescription(String artifactDescription) { + this.artifactDescription = artifactDescription; + } + + public Integer getArtifactTimeout() { + return artifactTimeout; + } + + public void setArtifactTimeout(Integer artifactTimeout) { + this.artifactTimeout = artifactTimeout; + } + + public List<String> getRelatedArtifacts() { + return relatedArtifacts; + } + + public void setRelatedArtifacts(List<String> relatedArtifacts) { + this.relatedArtifacts = relatedArtifacts; + } + + @Override + public String toString() { + return "ArtifactInfoImpl [artifactName=" + artifactName + ", artifactType=" + artifactType + ", artifactURL=" + artifactURL + ", artifactChecksum=" + artifactChecksum + ", artifactDescription=" + artifactDescription + ", artifactTimeout=" + + artifactTimeout + ", artifactUUID=" + artifactUUID + ", artifactVersion=" + artifactVersion + ", generatedFromUUID=" + generatedFromUUID + ", relatedArtifacts=" + relatedArtifacts + "]"; + } + + public String getArtifactUUID() { + return artifactUUID; + } + + public void setArtifactUUID(String artifactUUID) { + this.artifactUUID = artifactUUID; + } + + public String getArtifactVersion() { + return artifactVersion; + } + + public void setArtifactVersion(String artifactVersion) { + this.artifactVersion = artifactVersion; + } + + public String getGeneratedFromUUID() { + return generatedFromUUID; + } + + public void setGeneratedFromUUID(String generatedFromUUID) { + this.generatedFromUUID = generatedFromUUID; + } + + public void updateArtifactTimeout(){ + int currentConfigTimeout = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getCurrentArtifactInstallationTimeout(); + if(artifactTimeout == null || artifactTimeout < currentConfigTimeout) + artifactTimeout = currentConfigTimeout; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaErrorResponse.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaErrorResponse.java index 149ea2286a..3251f3d047 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaErrorResponse.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaErrorResponse.java @@ -20,67 +20,67 @@ package org.openecomp.sdc.be.components.distribution.engine; +import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; + import java.util.ArrayList; import java.util.List; -import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; - public class CambriaErrorResponse { - public static final int HTTP_OK = 200; + public static final int HTTP_OK = 200; - public static final int HTTP_INTERNAL_SERVER_ERROR = 500; + public static final int HTTP_INTERNAL_SERVER_ERROR = 500; - CambriaOperationStatus operationStatus; - Integer httpCode; - List<String> variables = new ArrayList<String>(); + CambriaOperationStatus operationStatus; + Integer httpCode; + List<String> variables = new ArrayList<String>(); - public CambriaErrorResponse() { - super(); - } + public CambriaErrorResponse() { + super(); + } - public CambriaErrorResponse(CambriaOperationStatus operationStatus) { - super(); - this.operationStatus = operationStatus; - } + public CambriaErrorResponse(CambriaOperationStatus operationStatus) { + super(); + this.operationStatus = operationStatus; + } - public CambriaErrorResponse(CambriaOperationStatus operationStatus, Integer httpCode) { - super(); - this.operationStatus = operationStatus; - this.httpCode = httpCode; - } + public CambriaErrorResponse(CambriaOperationStatus operationStatus, Integer httpCode) { + super(); + this.operationStatus = operationStatus; + this.httpCode = httpCode; + } - public CambriaOperationStatus getOperationStatus() { - return operationStatus; - } + public CambriaOperationStatus getOperationStatus() { + return operationStatus; + } - public void setOperationStatus(CambriaOperationStatus operationStatus) { - this.operationStatus = operationStatus; - } + public void setOperationStatus(CambriaOperationStatus operationStatus) { + this.operationStatus = operationStatus; + } - public Integer getHttpCode() { - return httpCode; - } + public Integer getHttpCode() { + return httpCode; + } - public void setHttpCode(Integer httpCode) { - this.httpCode = httpCode; - } + public void setHttpCode(Integer httpCode) { + this.httpCode = httpCode; + } - public void addVariable(String variable) { - variables.add(variable); - } + public void addVariable(String variable) { + variables.add(variable); + } - public List<String> getVariables() { - return variables; - } + public List<String> getVariables() { + return variables; + } - public void setVariables(List<String> variables) { - this.variables = variables; - } + public void setVariables(List<String> variables) { + this.variables = variables; + } - @Override - public String toString() { - return "CambriaErrorResponse [operationStatus=" + operationStatus + ", httpCode=" + httpCode + ", variables=" + variables + "]"; - } + @Override + public String toString() { + return "CambriaErrorResponse [operationStatus=" + operationStatus + ", httpCode=" + httpCode + ", variables=" + variables + "]"; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java index c496715a02..d6fee9a9d3 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandler.java @@ -20,25 +20,7 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.http.HttpStatus; -import org.openecomp.sdc.be.config.BeEcompErrorManager; -import org.openecomp.sdc.be.config.ConfigurationManager; -import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; -import org.openecomp.sdc.common.config.EcompErrorName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.att.nsa.apiClient.credentials.ApiCredential; import com.att.nsa.apiClient.http.HttpException; import com.att.nsa.apiClient.http.HttpObjectNotFoundException; import com.att.nsa.cambria.client.CambriaBatchingPublisher; @@ -53,574 +35,591 @@ import com.att.nsa.cambria.client.CambriaConsumer; import com.att.nsa.cambria.client.CambriaIdentityManager; import com.att.nsa.cambria.client.CambriaPublisher.message; import com.att.nsa.cambria.client.CambriaTopicManager; +import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; - import fj.data.Either; +import org.apache.http.HttpStatus; +import org.openecomp.sdc.be.config.BeEcompErrorManager; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@Component("cambriaHandler") public class CambriaHandler { - private static Logger logger = LoggerFactory.getLogger(CambriaHandler.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(CambriaHandler.class); + + private static final String PARTITION_KEY = "asdc" + "aa"; + + private final String SEND_NOTIFICATION = "send notification"; + + private Gson gson = new Gson(); + + private static final String CONSUMER_ID = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getDistributionStatusTopic().getConsumerId(); + + + + /** + * process the response error from Cambria client + * + * @param message + * @return + */ + private Integer processMessageException(String message) { + + String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" }; + + Integer result = checkPattern(patterns[0], message, 2); + if (result != null) { + return result; + } + result = checkPattern(patterns[1], message, 2); + + return result; + + } + + /** + * check whether the message has a match with a given pattern inside it + * + * @param patternStr + * @param message + * @param groupIndex + * @return + */ + private Integer checkPattern(String patternStr, String message, int groupIndex) { + Integer result = null; + + Pattern pattern = Pattern.compile(patternStr); + Matcher matcher = pattern.matcher(message); + boolean find = matcher.find(); + if (find) { + String httpCode = matcher.group(groupIndex); + if (httpCode != null) { + try { + result = Integer.valueOf(httpCode); + } catch (NumberFormatException e) { + logger.debug("Failed to parse http code {}", httpCode); + } + } + } + return result; + } + + /** + * retrieve all topics from U-EB server + * + * @param hostSet + * @return + */ + public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) { + + CambriaTopicManager createTopicManager = null; + try { + + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)); + + Set<String> topics = createTopicManager.getTopics(); + + if (topics == null || true == topics.isEmpty()) { + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null); + return Either.right(cambriaErrorResponse); + } + + return Either.left(topics); + + } catch (IOException | GeneralSecurityException e) { + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + CambriaErrorResponse cambriaErrorResponse = processError(e); + + logger.debug("Failed to fetch topics from U-EB server", e); + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics"); + + return Either.right(cambriaErrorResponse); + } finally { + if (createTopicManager != null) { + createTopicManager.close(); + } + } + + } + + /** + * process the error message from Cambria client. + * + * set Cambria status and http code in case we succeed to fetch it + * + * @return + */ + private CambriaErrorResponse processError(Exception e) { + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(); + + Integer httpCode = processMessageException(e.getMessage()); + + if (httpCode != null) { + cambriaErrorResponse.setHttpCode(httpCode); + switch (httpCode.intValue()) { + + case 401: + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR); + break; + case 409: + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST); + break; + case 500: + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR); + break; + default: + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR); + } + } else { + + boolean found = false; + Throwable throwable = e.getCause(); + if (throwable != null) { + String message = throwable.getMessage(); + + Throwable cause = throwable.getCause(); + + if (cause != null) { + Class<?> clazz = cause.getClass(); + String className = clazz.getName(); + if (className.endsWith("UnknownHostException")) { + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR); + cambriaErrorResponse.addVariable(message); + found = true; + } + } + } + + if (false == found) { + cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR); + cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR); + } + } + + return cambriaErrorResponse; + } + + /** + * write the error to the log + * + * @param cambriaErrorResponse + * @param errorMessage + * @param methodName + * @param operationDesc + */ + private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) { + + String httpCode = cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode()); + + switch (cambriaErrorResponse.getOperationStatus()) { + case UNKNOWN_HOST_ERROR: + String hostname = cambriaErrorResponse.getVariables().get(0); + BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode); + break; + case AUTHENTICATION_ERROR: + BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode); + break; + case CONNNECTION_ERROR: + BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode); + break; + + case INTERNAL_SERVER_ERROR: + BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc); + break; + default: + break; + } + + } + + /** + * create a topic if it does not exists in the topicsList + * + * @param hostSet + * - list of U-EB servers + * @param apiKey + * @param secretKey + * @param topicName + * - topic to create + * @param partitionCount + * @param replicationCount + * @return + */ + public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) { + + CambriaTopicManager createTopicManager = null; + try { + + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey)); + + createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount); + + } catch (HttpException | IOException | GeneralSecurityException e) { + + logger.debug("Failed to create topic {}", topicName, e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + CambriaErrorResponse cambriaErrorResponse = processError(e); + + if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) { + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic"); + } + + return cambriaErrorResponse; + + } finally { + if (createTopicManager != null) { + createTopicManager.close(); + } + } + return new CambriaErrorResponse(CambriaOperationStatus.OK); + + } + + public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) { + CambriaTopicManager createTopicManager = null; + try { + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey)); + + if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { + createTopicManager.revokeProducer(topicName, subscriberApiKey); + } else { + createTopicManager.revokeConsumer(topicName, subscriberApiKey); + } + + } catch (HttpObjectNotFoundException | GeneralSecurityException e) { + logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage()); + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND); + return cambriaErrorResponse; + + } catch (HttpException | IOException e) { + logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + CambriaErrorResponse cambriaErrorResponse = processError(e); + + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase()); + + return cambriaErrorResponse; + } finally { + if (createTopicManager != null) { + createTopicManager.close(); + } + } + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK); + return cambriaErrorResponse; + } + + /** + * + * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER + * + * @param hostSet + * @param managerApiKey + * @param managerSecretKey + * @param subscriberApiKey + * @param subscriberTypeEnum + * @param topicName + * @return + */ + public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum, String topicName) { + + CambriaTopicManager createTopicManager = null; + try { + createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey)); + + if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { + createTopicManager.allowProducer(topicName, subscriberApiKey); + } else { + createTopicManager.allowConsumer(topicName, subscriberApiKey); + } + + } catch (HttpObjectNotFoundException | GeneralSecurityException e) { + logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage()); + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND); + return cambriaErrorResponse; + + } catch (HttpException | IOException e) { + logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + CambriaErrorResponse cambriaErrorResponse = processError(e); + + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase()); + + return cambriaErrorResponse; + } finally { + if (createTopicManager != null) { + createTopicManager.close(); + } + } + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK); + return cambriaErrorResponse; + } + + /** + * create and retrieve a Cambria Consumer for a specific topic + * + * @param hostSet + * @param topicName + * @param apiKey + * @param secretKey + * @param consumerId + * @param consumerGroup + * @param timeoutMS + * @return + * @throws Exception + */ + public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception { + + CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHosts(hostSet).waitAtServer(timeoutMS).build(); + consumer.setApiCredentials(apiKey, secretKey); + return consumer; + } + + public void closeConsumer(CambriaConsumer consumer) { + + if (consumer != null) { + consumer.close(); + } + + } + + /** + * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error) + * + * @param topicConsumer + * @return + */ + public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) { + + try { + Iterable<String> messages = topicConsumer.fetch(); + if (messages == null) { + messages = new ArrayList<String>(); + } + return Either.left(messages); + + } catch (IOException e) { + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + CambriaErrorResponse cambriaErrorResponse = processError(e); + + logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage()); + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic"); + + return Either.right(cambriaErrorResponse); + + } catch (Exception e) { + logger.debug("Failed to fetch from U-EB topic", e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage()); + + CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR); + return Either.right(cambriaErrorResponse); + } + } + + /** + * Publish notification message to a given queue + * + * @param topicName + * @param uebPublicKey + * @param uebSecretKey + * @param uebServers + * @param data + * @return + */ + public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) { + + CambriaBatchingPublisher createSimplePublisher = null; + + try { + + String json = gson.toJson(data); + logger.trace("Before sending notification data {} to topic {}", json, topicName); + + createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build(); + createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); - private static final String PARTITION_KEY = "asdc" + "aa"; - - private final String SEND_NOTIFICATION = "send notification"; - - private Gson gson = new Gson(); - - public static boolean useHttpsWithDmaap = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().isUseHttpsWithDmaap(); + int result = createSimplePublisher.send(PARTITION_KEY, json); + try { + Thread.sleep(1 * 1000); + } catch (InterruptedException e) { + logger.debug("Failed during sleep after sending the message.", e); + } - /** - * process the response error from Cambria client - * - * @param message - * @return - */ - private Integer processMessageException(String message) { - - String[] patterns = { "(HTTP Status )(\\d\\d\\d)", "(HTTP/\\d.\\d )(\\d\\d\\d)" }; - - Integer result = checkPattern(patterns[0], message, 2); - if (result != null) { - return result; - } - result = checkPattern(patterns[1], message, 2); - - return result; - - } - - /** - * check whether the message has a match with a given pattern inside it - * - * @param patternStr - * @param message - * @param groupIndex - * @return - */ - private Integer checkPattern(String patternStr, String message, int groupIndex) { - Integer result = null; - - Pattern pattern = Pattern.compile(patternStr); - Matcher matcher = pattern.matcher(message); - boolean find = matcher.find(); - if (find) { - String httpCode = matcher.group(groupIndex); - if (httpCode != null) { - try { - result = Integer.valueOf(httpCode); - } catch (NumberFormatException e) { - logger.debug("Failed to parse http code {}", httpCode); - } - } - } - return result; - } - - /** - * retrieve all topics from U-EB server - * - * @param hostSet - * @return - */ - public Either<Set<String>, CambriaErrorResponse> getTopics(List<String> hostSet) { - - CambriaTopicManager createTopicManager = null; - try { - - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet)); - - Set<String> topics = createTopicManager.getTopics(); - - if (topics == null || true == topics.isEmpty()) { - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND, null); - return Either.right(cambriaErrorResponse); - } - - return Either.left(topics); - - } catch (IOException | GeneralSecurityException e) { - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - CambriaErrorResponse cambriaErrorResponse = processError(e); - - logger.debug("Failed to fetch topics from U-EB server", e); - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get topics"); - - return Either.right(cambriaErrorResponse); - } finally { - if (createTopicManager != null) { - createTopicManager.close(); - } - } - - } - - /** - * process the error message from Cambria client. - * - * set Cambria status and http code in case we succeed to fetch it - * - * @return - */ - private CambriaErrorResponse processError(Exception e) { - - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(); - - Integer httpCode = processMessageException(e.getMessage()); - - if (httpCode != null) { - cambriaErrorResponse.setHttpCode(httpCode); - switch (httpCode.intValue()) { - - case 401: - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.AUTHENTICATION_ERROR); - break; - case 409: - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.TOPIC_ALREADY_EXIST); - break; - case 500: - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.INTERNAL_SERVER_ERROR); - break; - default: - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR); - } - } else { - - boolean found = false; - Throwable throwable = e.getCause(); - if (throwable != null) { - String message = throwable.getMessage(); - - Throwable cause = throwable.getCause(); - - if (cause != null) { - Class<?> clazz = cause.getClass(); - String className = clazz.getName(); - if (className.endsWith("UnknownHostException")) { - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.UNKNOWN_HOST_ERROR); - cambriaErrorResponse.addVariable(message); - found = true; - } - } - } - - if (false == found) { - cambriaErrorResponse.setOperationStatus(CambriaOperationStatus.CONNNECTION_ERROR); - cambriaErrorResponse.setHttpCode(HttpStatus.SC_INTERNAL_SERVER_ERROR); - } - } - - return cambriaErrorResponse; - } - - /** - * write the error to the log - * - * @param cambriaErrorResponse - * @param errorMessage - * @param methodName - * @param operationDesc - */ - private void writeErrorToLog(CambriaErrorResponse cambriaErrorResponse, String errorMessage, String methodName, String operationDesc) { - - String httpCode = (cambriaErrorResponse.getHttpCode() == null ? "" : String.valueOf(cambriaErrorResponse.getHttpCode())); - - switch (cambriaErrorResponse.getOperationStatus()) { - case UNKNOWN_HOST_ERROR: - String hostname = cambriaErrorResponse.getVariables().get(0); - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebUnkownHostError, methodName, hostname); - BeEcompErrorManager.getInstance().logBeUebUnkownHostError(methodName, httpCode); - break; - case AUTHENTICATION_ERROR: - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebAuthenticationError, methodName, httpCode); - BeEcompErrorManager.getInstance().logBeUebAuthenticationError(methodName, httpCode); - break; - case CONNNECTION_ERROR: - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebConnectionError, methodName, httpCode); - BeEcompErrorManager.getInstance().logBeUebConnectionError(methodName, httpCode); - break; - - case INTERNAL_SERVER_ERROR: - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, methodName, operationDesc); - BeEcompErrorManager.getInstance().logBeUebSystemError(methodName, operationDesc); - break; - default: - break; - } - - } - - /** - * create a topic if it does not exists in the topicsList - * - * @param hostSet - * - list of U-EB servers - * @param apiKey - * @param secretKey - * @param topicName - * - topic to create - * @param partitionCount - * @param replicationCount - * @return - */ - public CambriaErrorResponse createTopic(Collection<String> hostSet, String apiKey, String secretKey, String topicName, int partitionCount, int replicationCount) { - - CambriaTopicManager createTopicManager = null; - try { - - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(apiKey, secretKey)); - - createTopicManager.createTopic(topicName, "ASDC distribution notification topic", partitionCount, replicationCount); - - } catch (HttpException | IOException | GeneralSecurityException e) { - - logger.debug("Failed to create topic {}", topicName, e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - CambriaErrorResponse cambriaErrorResponse = processError(e); - - if (cambriaErrorResponse.getOperationStatus() != CambriaOperationStatus.TOPIC_ALREADY_EXIST) { - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "create topic"); - } - - return cambriaErrorResponse; - - } finally { - if (createTopicManager != null) { - createTopicManager.close(); - } - } - return new CambriaErrorResponse(CambriaOperationStatus.OK); - - } - - public CambriaErrorResponse unRegisterFromTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) { - CambriaTopicManager createTopicManager = null; - try { - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey)); - - if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { - createTopicManager.revokeProducer(topicName, subscriberApiKey); - } else { - createTopicManager.revokeConsumer(topicName, subscriberApiKey); - } - - } catch (HttpObjectNotFoundException | GeneralSecurityException e) { - logger.debug("Failed to unregister {} from topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage()); - BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage()); - - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND); - return cambriaErrorResponse; - - } catch (HttpException | IOException e) { - logger.debug("Failed to unregister {} from topic {} as producer", managerApiKey, topicName, e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - CambriaErrorResponse cambriaErrorResponse = processError(e); - - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "unregister from topic as " + subscriberTypeEnum.toString().toLowerCase()); - - return cambriaErrorResponse; - } finally { - if (createTopicManager != null) { - createTopicManager.close(); - } - } - - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK); - return cambriaErrorResponse; - } - - /** - * - * register a public key (subscriberId) to a given topic as a CONSUMER or PRODUCER - * - * @param hostSet - * @param topicName - * @param managerApiKey - * @param managerSecretKey - * @param subscriberApiKey - * @param subscriberTypeEnum - * @return - */ - public CambriaErrorResponse registerToTopic(Collection<String> hostSet, String topicName, String managerApiKey, String managerSecretKey, String subscriberApiKey, SubscriberTypeEnum subscriberTypeEnum) { - - CambriaTopicManager createTopicManager = null; - try { - createTopicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(hostSet).authenticatedBy(managerApiKey, managerSecretKey)); - - if (subscriberTypeEnum == SubscriberTypeEnum.PRODUCER) { - createTopicManager.allowProducer(topicName, subscriberApiKey); - } else { - createTopicManager.allowConsumer(topicName, subscriberApiKey); - } - - } catch (HttpObjectNotFoundException | GeneralSecurityException e) { - logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebObjectNotFoundError, methodName, e.getMessage()); - BeEcompErrorManager.getInstance().logBeUebObjectNotFoundError(methodName, e.getMessage()); - - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OBJECT_NOT_FOUND, HttpStatus.SC_NOT_FOUND); - return cambriaErrorResponse; - - } catch (HttpException | IOException e) { - logger.debug("Failed to register {} to topic {} as {}", managerApiKey, topicName, subscriberTypeEnum.toString().toLowerCase(), e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - CambriaErrorResponse cambriaErrorResponse = processError(e); - - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "register to topic as " + subscriberTypeEnum.toString().toLowerCase()); - - return cambriaErrorResponse; - } finally { - if (createTopicManager != null) { - createTopicManager.close(); - } - } - - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK, HttpStatus.SC_OK); - return cambriaErrorResponse; - } - - /** - * create and retrieve a Cambria Consumer for a specific topic - * - * @param hostSet - * @param topicName - * @param apiKey - * @param secretKey - * @param consumerId - * @param consumerGroup - * @param timeoutMS - * @return - * @throws Exception - */ - public CambriaConsumer createConsumer(Collection<String> hostSet, String topicName, String apiKey, String secretKey, String consumerId, String consumerGroup, int timeoutMS) throws Exception { - - CambriaConsumer consumer = new ConsumerBuilder().authenticatedBy(apiKey, secretKey).knownAs(consumerGroup, consumerId).onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(hostSet).withSocketTimeout(timeoutMS).build(); - consumer.setApiCredentials(apiKey, secretKey); - return consumer; - } - - public void closeConsumer(CambriaConsumer consumer) { - - if (consumer != null) { - consumer.close(); - } - - } - - /** - * use the topicConsumer to fetch messages from topic. in case no messages were fetched, empty ArrayList will be returned (not error) - * - * @param topicConsumer - * @return - */ - public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(CambriaConsumer topicConsumer) { - - try { - Iterable<String> messages = topicConsumer.fetch(); - if (messages == null) { - messages = new ArrayList<String>(); - } - return Either.left(messages); - - } catch (IOException e) { - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - CambriaErrorResponse cambriaErrorResponse = processError(e); - - logger.debug("Failed to fetch from U-EB topic. error={}", e.getMessage()); - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, "get messages from topic"); - - return Either.right(cambriaErrorResponse); - - } catch (Exception e) { - logger.debug("Failed to fetch from U-EB topic", e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); + logger.debug("After sending notification data to topic {}. result is {}", topicName, result); - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage()); - BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage()); + CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); - CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, HttpStatus.SC_INTERNAL_SERVER_ERROR); - return Either.right(cambriaErrorResponse); - } - } + return response; - /** - * Publish notification message to a given queue - * - * @param topicName - * @param uebPublicKey - * @param uebSecretKey - * @param uebServers - * @param data - * @return - */ - public CambriaErrorResponse sendNotification(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data) { + } catch (IOException | GeneralSecurityException e) { + logger.debug("Failed to send notification {} to topic {} ", data, topicName, e); - CambriaBatchingPublisher createSimplePublisher = null; + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); - try { + CambriaErrorResponse cambriaErrorResponse = processError(e); - String json = gson.toJson(data); - logger.trace("Before sending notification data {} to topic {}", json, topicName); + writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, SEND_NOTIFICATION); - createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build(); - createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); - - int result = createSimplePublisher.send(PARTITION_KEY, json); - - try { - Thread.sleep(1 * 1000); - } catch (InterruptedException e) { - logger.debug("Failed during sleep after sending the message.", e); - } - - logger.debug("After sending notification data to topic {}. result is {}", topicName, result); - - CambriaErrorResponse response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); - - return response; + return cambriaErrorResponse; + } finally { + if (createSimplePublisher != null) { + logger.debug("Before closing publisher"); + createSimplePublisher.close(); + logger.debug("After closing publisher"); + } + } + } - } catch (IOException | GeneralSecurityException e) { - logger.debug("Failed to send notification {} to topic {} ", data, topicName, e); + public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) { - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); + CambriaBatchingPublisher createSimplePublisher = null; - CambriaErrorResponse cambriaErrorResponse = processError(e); + CambriaErrorResponse response = null; + try { - writeErrorToLog(cambriaErrorResponse, e.getMessage(), methodName, SEND_NOTIFICATION); + String json = gson.toJson(data); + logger.debug("Before sending notification data {} to topic {}", json, topicName); - return cambriaErrorResponse; - } finally { - if (createSimplePublisher != null) { - logger.debug("Before closing publisher"); - createSimplePublisher.close(); - logger.debug("After closing publisher"); - } - } - } + createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHosts(uebServers).build(); + createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); - private String convertListToString(List<String> list) { - StringBuilder builder = new StringBuilder(); + int result = createSimplePublisher.send(PARTITION_KEY, json); - if (list != null) { - for (int i = 0; i < list.size(); i++) { - builder.append(list.get(i)); - if (i < list.size() - 1) { - builder.append(","); - } - } - } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.debug("Failed during sleep after sending the message.", e); + } - return builder.toString(); - } + logger.debug("After sending notification data to topic {}. result is {}", topicName, result); - public CambriaErrorResponse sendNotificationAndClose(String topicName, String uebPublicKey, String uebSecretKey, List<String> uebServers, INotificationData data, long waitBeforeCloseTimeout) { + } catch (IOException | GeneralSecurityException e) { + logger.debug("Failed to send notification {} to topic {} ", data, topicName, e); - CambriaBatchingPublisher createSimplePublisher = null; + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); - CambriaErrorResponse response = null; - try { + response = processError(e); - String json = gson.toJson(data); - logger.debug("Before sending notification data {} to topic {}", json, topicName); + writeErrorToLog(response, e.getMessage(), methodName, SEND_NOTIFICATION); - createSimplePublisher = new PublisherBuilder().onTopic(topicName).usingHttps(useHttpsWithDmaap).usingHosts(uebServers).build(); - createSimplePublisher.setApiCredentials(uebPublicKey, uebSecretKey); + return response; - int result = createSimplePublisher.send(PARTITION_KEY, json); + } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - logger.debug("Failed during sleep after sending the message.", e); - } + logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout); + try { + List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS); + if (messagesInQ != null && false == messagesInQ.isEmpty()) { + logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size()); + response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION); + } else { + logger.debug("No message left in the queue after closing cambria publisher"); + response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); + } + } catch (IOException | InterruptedException e) { + logger.debug("Failed to close cambria publisher", e); + response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION); + } + logger.debug("After closing publisher"); - logger.debug("After sending notification data to topic {}. result is {}", topicName, result); + return response; - } catch (IOException | GeneralSecurityException e) { - logger.debug("Failed to send notification {} to topic {} ", data, topicName, e); + } - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); + public CambriaErrorResponse getApiKey(String server, String apiKey) { - response = processError(e); + CambriaErrorResponse response = null; - writeErrorToLog(response, e.getMessage(), methodName, SEND_NOTIFICATION); + List<String> hostSet = new ArrayList<>(); + hostSet.add(server); + try { + CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet)); + createIdentityManager.getApiKey(apiKey); - return response; + response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); - } + } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) { + logger.debug("Failed to fetch api key {} from server {}", apiKey, server, e); - logger.debug("Before closing publisher. Maximum timeout is {} seconds", waitBeforeCloseTimeout); - try { - List<message> messagesInQ = createSimplePublisher.close(waitBeforeCloseTimeout, TimeUnit.SECONDS); - if (messagesInQ != null && false == messagesInQ.isEmpty()) { - logger.debug("Cambria client returned {} non sent messages.", messagesInQ.size()); - response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION); - } else { - logger.debug("No message left in the queue after closing cambria publisher"); - response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); - } - } catch (IOException | InterruptedException e) { - logger.debug("Failed to close cambria publisher", e); - response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - writeErrorToLog(response, "closing publisher returned non sent messages", methodName, SEND_NOTIFICATION); - } - logger.debug("After closing publisher"); + response = processError(e); - return response; + } - } + return response; + } - public CambriaErrorResponse getApiKey(String server, String apiKey) { + public Either<ApiCredential, CambriaErrorResponse> createUebKeys(List<String> hostSet) { + Either<ApiCredential, CambriaErrorResponse> result; - CambriaErrorResponse response = null; + try { + CambriaIdentityManager createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet)); - List<String> hostSet = new ArrayList<>(); - hostSet.add(server); - CambriaIdentityManager createIdentityManager = null; - try { - createIdentityManager = buildCambriaClient(new IdentityManagerBuilder().usingHosts(hostSet)); - createIdentityManager.getApiKey(apiKey); - response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); + String description = String.format("ASDC Key for %s", CONSUMER_ID); + ApiCredential credential = createIdentityManager.createApiKey("", description); + createIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); + result = Either.left(credential); - } catch (HttpException | IOException | CambriaApiException | GeneralSecurityException e) { - logger.debug("Failed to fetch api key {} from server ", apiKey, server, e); + } catch (Exception e) { + logger.debug("Failed to create ueb keys for servers {}",hostSet, e); - response = processError(e); + result = Either.right(processError(e)); - } + } - return response; - } + return result; + } - private static <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException { - if (useHttpsWithDmaap) { - client.usingHttps(); - } - return (T)client.build(); - } + @VisibleForTesting + <T extends CambriaClient> T buildCambriaClient(CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) throws MalformedURLException, GeneralSecurityException { + return (T)client.build(); + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DME2EndpointIteratorCreator.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DME2EndpointIteratorCreator.java new file mode 100644 index 0000000000..94fff3cfaa --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DME2EndpointIteratorCreator.java @@ -0,0 +1,19 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +import com.att.aft.dme2.api.DME2Exception; +import com.att.aft.dme2.api.DME2Manager; +import com.att.aft.dme2.iterator.DME2EndpointIterator; +import com.att.aft.dme2.iterator.factory.DME2EndpointIteratorFactory; + +import org.springframework.stereotype.Component; + +@Component +public class DME2EndpointIteratorCreator { + + public DME2EndpointIterator create(String lookupURI) throws DME2Exception { + // Initializing DME2Manager instance + DME2Manager manager = DME2Manager.getDefaultInstance(); + // Returning an instance of the DME2EndpointIteratorFactory + return (DME2EndpointIterator) DME2EndpointIteratorFactory.getInstance().getIterator(lookupURI, null, null, manager); + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DeConfigurationStatus.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DeConfigurationStatus.java deleted file mode 100644 index 5e4c08275f..0000000000 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DeConfigurationStatus.java +++ /dev/null @@ -1,40 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * SDC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.sdc.be.components.distribution.engine; - -public enum DeConfigurationStatus { - - OK(""), MISSING_CONFIGURATION(""); - - private String description; - - DeConfigurationStatus(String description) { - this.description = description; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } -} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java index bd3d74e323..7d2d4680b5 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java @@ -20,361 +20,340 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.openecomp.sdc.be.components.validation.ServiceDistributionValidation; import org.openecomp.sdc.be.config.BeEcompErrorManager; import org.openecomp.sdc.be.config.ConfigurationManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; -import org.openecomp.sdc.be.impl.ComponentsUtils; +import org.openecomp.sdc.be.dao.api.ActionStatus; import org.openecomp.sdc.be.model.Service; import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus; -import org.openecomp.sdc.common.config.EcompErrorName; +import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; import org.openecomp.sdc.common.util.YamlToObjectConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import fj.data.Either; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.annotation.Resource; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; @Component("distributionEngine") public class DistributionEngine implements IDistributionEngine { - public static final Pattern FQDN_PATTERN = Pattern.compile("^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9])(\\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9]))*(:[0-9]{2,4})*$", Pattern.CASE_INSENSITIVE); - - public static void main(String[] args) { - - List<String> servers = new ArrayList<>(); - - servers.add("uebsb91kcdc.it.att.com:3904"); - servers.add("uebsb91kcdc.it.att.com:3904"); - servers.add("uebsb91kcdc.it.att.com:3904"); - - YamlToObjectConverter converter = new YamlToObjectConverter(); - DistributionEngineConfiguration distributionEngineConfiguration = converter.convert("src/test/resources/config/catalog-be/distribEngine1/distribution-engine-configuration.yaml", DistributionEngineConfiguration.class); - - DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(2l, distributionEngineConfiguration, "PROD", new AtomicBoolean(false), null, null); - distributionEngineInitTask.startTask(); - - } - - @javax.annotation.Resource - private ComponentsUtils componentUtils; - - @javax.annotation.Resource - private DistributionNotificationSender distributionNotificationSender; - - @javax.annotation.Resource - private ServiceDistributionArtifactsBuilder serviceDistributionArtifactsBuilder; - - @javax.annotation.Resource - private DistributionEngineClusterHealth distributionEngineClusterHealth; - - private static Logger logger = LoggerFactory.getLogger(DistributionEngine.class.getName()); - - private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<String, DistributionEngineInitTask>(); - private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<String, DistributionEnginePollingTask>(); - - private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<String, AtomicBoolean>(); - - @Override - public boolean isActive() { - - if (true == envNamePerInitTask.isEmpty()) { - return false; - } - - for (DistributionEngineInitTask task : envNamePerInitTask.values()) { - boolean active = task.isActive(); - if (active == false) { - return false; - } - } - return true; - } - - @PostConstruct - private void init() { - - logger.trace("Enter init method of DistributionEngine"); - - DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); - - boolean startDistributionEngine = distributionEngineConfiguration.isStartDistributionEngine(); - logger.debug("Distribution engine activation parameter is {}", startDistributionEngine); - if (false == startDistributionEngine) { - logger.info("The disribution engine is disabled"); - - this.distributionEngineClusterHealth.setHealthCheckUebIsDisabled(); - - return; - } - - boolean isValidConfig = validateConfiguration(distributionEngineConfiguration); - - if (false == isValidConfig) { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DistributionEngineInitTask.INIT_DISTRIBUTION_ENGINE_FLOW, "validate distribution configuration in init phase"); - BeEcompErrorManager.getInstance().logBeUebSystemError(DistributionEngineInitTask.INIT_DISTRIBUTION_ENGINE_FLOW, "validate distribution configuration in init phase"); - - this.distributionEngineClusterHealth.setHealthCheckUebConfigurationError(); - return; - } - - List<String> environments = distributionEngineConfiguration.getEnvironments(); - - for (String envName : environments) { - - DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask(distributionEngineConfiguration, envName, componentUtils, distributionEngineClusterHealth); - - logger.debug("Init task for environment {}", envName); - - AtomicBoolean status = new AtomicBoolean(false); - envNamePerStatus.put(envName, status); - DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l, distributionEngineConfiguration, envName, status, componentUtils, distributionEnginePollingTask); - distributionEngineInitTask.startTask(); - envNamePerInitTask.put(envName, distributionEngineInitTask); - envNamePerPollingTask.put(envName, distributionEnginePollingTask); - } - - logger.debug("Init UEB health check"); - distributionEngineClusterHealth.startHealthCheckTask(envNamePerStatus); - - logger.trace("Exit init method of DistributionEngine"); - - } - - @PreDestroy - public void shutdown() { - logger.info("distribution engine shutdown - start"); - if (envNamePerInitTask != null) { - for (DistributionEngineInitTask task : envNamePerInitTask.values()) { - task.destroy(); - } - } - if (envNamePerPollingTask != null) { - for (DistributionEnginePollingTask task : envNamePerPollingTask.values()) { - task.destroy(); - } - } - - } - - /** - * validate mandatory configuration parameters received - * - * @param deConfiguration - * @return - */ - protected boolean validateConfiguration(DistributionEngineConfiguration deConfiguration) { - - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - boolean result = true; - result = isValidServers(deConfiguration.getUebServers(), methodName, "uebServers") && result; - result = isValidParam(deConfiguration.getEnvironments(), methodName, "environments") && result; - result = isValidParam(deConfiguration.getUebPublicKey(), methodName, "uebPublicKey") && result; - result = isValidParam(deConfiguration.getUebSecretKey(), methodName, "uebSecretKey") && result; - result = isValidParam(deConfiguration.getDistributionNotifTopicName(), methodName, "distributionNotifTopicName") && result; - result = isValidParam(deConfiguration.getDistributionStatusTopicName(), methodName, "distributionStatusTopicName") && result; - result = isValidObject(deConfiguration.getCreateTopic(), methodName, "createTopic") && result; - result = isValidObject(deConfiguration.getDistributionStatusTopic(), methodName, "distributionStatusTopic") && result; - result = isValidObject(deConfiguration.getInitMaxIntervalSec(), methodName, "initMaxIntervalSec") && result; - result = isValidObject(deConfiguration.getInitRetryIntervalSec(), methodName, "initRetryIntervalSec") && result; - result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerId(), methodName, "consumerId") && result; - result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerGroup(), methodName, "consumerGroup") && result; - result = isValidObject(deConfiguration.getDistributionStatusTopic().getFetchTimeSec(), methodName, "fetchTimeSec") && result; - result = isValidObject(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec(), methodName, "pollingIntervalSec") && result; - - return result; - } - - private boolean isValidServers(List<String> uebServers, String methodName, String paramName) { - - if (uebServers == null || uebServers.size() == 0) { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeMissingConfigurationError, methodName, paramName); - BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName); - return false; - } - - if (uebServers.size() < 2) { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeConfigurationInvalidListSizeError, methodName, paramName, "2"); - BeEcompErrorManager.getInstance().logBeConfigurationInvalidListSizeError(methodName, paramName, 2); - return false; - } - - for (String serverFqdn : uebServers) { - if (false == isValidFqdn(serverFqdn)) { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeInvalidConfigurationError, methodName, paramName, serverFqdn); - BeEcompErrorManager.getInstance().logBeInvalidConfigurationError(methodName, paramName, serverFqdn); - return false; - } - } - - return true; - } - - private boolean isValidFqdn(String serverFqdn) { - - try { - Matcher matcher = FQDN_PATTERN.matcher(serverFqdn); - return matcher.matches(); - - } catch (Exception e) { - logger.debug("Failed to match value of address {}", serverFqdn, e); - return false; - } - - } - - private boolean isEmptyParam(String param) { - - if (param == null || true == param.isEmpty()) { - return true; - } - - return false; - } - - private boolean isValidParam(String paramValue, String methodName, String paramName) { - - if (isEmptyParam(paramValue)) { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeMissingConfigurationError, methodName, paramName); - BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName); - return false; - } - return true; - - } - - private boolean isValidParam(List<String> paramValue, String methodName, String paramName) { - - if (isEmptyList(paramValue)) { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeMissingConfigurationError, methodName, paramName); - BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName); - return false; - } - return true; - - } - - private boolean isValidObject(Object paramValue, String methodName, String paramName) { - - if (paramValue == null) { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeMissingConfigurationError, methodName, paramName); - BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName); - return false; - } - return true; - - } - - private boolean isEmptyList(List<String> list) { - if (list == null || true == list.isEmpty()) { - return true; - } - return false; - } - - private String getEnvironmentErrorDescription(StorageOperationStatus status) { - - switch (status) { - case DISTR_ENVIRONMENT_NOT_AVAILABLE: - return "environment is unavailable"; - case DISTR_ENVIRONMENT_NOT_FOUND: - return "environment is not configured in our system"; - case DISTR_ENVIRONMENT_SENT_IS_INVALID: - return "environment name is invalid"; - - default: - return "unkhown"; - - } - } - - public StorageOperationStatus isEnvironmentAvailable(String envName) { - - if (envName == null || true == envName.isEmpty()) { - - return StorageOperationStatus.DISTR_ENVIRONMENT_SENT_IS_INVALID; - } - - AtomicBoolean status = envNamePerStatus.get(envName); - if (status == null) { - return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_FOUND; - } - - if (false == status.get()) { - return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_AVAILABLE; - } - return StorageOperationStatus.OK; - } + private static final Logger LOGGER = LoggerFactory.getLogger(DistributionEngine.class); + private static final Pattern FQDN_PATTERN = Pattern.compile("^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9])(\\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9]))*(:[0-9]{2,4})*$", Pattern.CASE_INSENSITIVE); - public StorageOperationStatus isEnvironmentAvailable() { + @Autowired + private EnvironmentsEngine environmentsEngine; - String envName = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getEnvironments().get(0); + @Resource + private DistributionNotificationSender distributionNotificationSender; - return isEnvironmentAvailable(envName); - } + @Resource + private ServiceDistributionArtifactsBuilder serviceDistributionArtifactsBuilder; - @Override - public void disableEnvironment(String envName) { - // TODO disable tasks - AtomicBoolean status = envNamePerStatus.get(envName); - status.set(false); - } + @Resource + private DistributionEngineClusterHealth distributionEngineClusterHealth; - @Override - public StorageOperationStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envName, String userId, String modifierName) { + @Resource + private ServiceDistributionValidation serviceDistributionValidation; - logger.debug("Received notify service request. distributionId = {}, serviceUuid = {} serviceUid = {}, envName = {}, userId = {}, modifierName {}", distributionId, service.getUUID(), service.getUniqueId(), envName, userId, modifierName); + private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>(); + private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>(); + private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>(); - DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); + /** + * The main method for testing only + * @param args + */ + public static void main(String[] args) { - String distributionNotifTopicName = deConfiguration.getDistributionNotifTopicName(); - String topicName = DistributionEngineInitTask.buildTopicName(distributionNotifTopicName, envName); + List<String> servers = new ArrayList<>(); + String server = "uebsb91kcdc.it.att.com:3904"; + servers.add(server); + servers.add(server); + servers.add(server); - StorageOperationStatus sendNotification = distributionNotificationSender.sendNotification(topicName, distributionId, deConfiguration, envName, notificationData, service, userId, modifierName); + YamlToObjectConverter converter = new YamlToObjectConverter(); + DistributionEngineConfiguration distributionEngineConfiguration = converter.convert("src/test/resources/config/catalog-be/distribEngine1/distribution-engine-configuration.yaml", DistributionEngineConfiguration.class); - logger.debug("Finish notifyService. status is {}", sendNotification); + DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(2l, distributionEngineConfiguration, "PROD", new AtomicBoolean(false), null, null, null); + distributionEngineInitTask.startTask(); + + } - return sendNotification; - } + @Override + public boolean isActive() { - @Override - public Either<INotificationData, StorageOperationStatus> isReadyForDistribution(Service service, String distributionId, String envName) { - StorageOperationStatus status = isEnvironmentAvailable(envName); - if (status != StorageOperationStatus.OK) { - String envErrorDec = getEnvironmentErrorDescription(status); - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, DistributionNotificationSender.DISTRIBUTION_NOTIFICATION_SENDING, - "Environment name " + envName + " is not available. Reason : " + envErrorDec); - BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(DistributionNotificationSender.DISTRIBUTION_NOTIFICATION_SENDING, "Environment name " + envName + " is not available. Reason : " + envErrorDec); - return Either.right(status); - } + if (envNamePerInitTask.isEmpty()) { + return false; + } - Either<Boolean, StorageOperationStatus> isServiceContainsDeploymentArtifactsStatus = serviceDistributionArtifactsBuilder.isServiceContainsDeploymentArtifacts(service); - if (isServiceContainsDeploymentArtifactsStatus.isRight()) { - StorageOperationStatus operationStatus = isServiceContainsDeploymentArtifactsStatus.right().value(); - return Either.right(operationStatus); - } else { - Boolean isDeploymentArtifactExists = isServiceContainsDeploymentArtifactsStatus.left().value(); - if (isDeploymentArtifactExists == null || isDeploymentArtifactExists.booleanValue() == false) { - return Either.right(StorageOperationStatus.DISTR_ARTIFACT_NOT_FOUND); - } - } + for (DistributionEngineInitTask task : envNamePerInitTask.values()) { + boolean active = task.isActive(); + if (!active) { + return false; + } + } + return true; + } - INotificationData value = serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(service, distributionId); - value = serviceDistributionArtifactsBuilder.buildServiceForDistribution(value, service); + @PostConstruct + private void init() { + + LOGGER.trace("Enter init method of DistributionEngine"); + + DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); + + boolean startDistributionEngine = distributionEngineConfiguration.isStartDistributionEngine(); + LOGGER.debug("Distribution engine activation parameter is {}", startDistributionEngine); + if (!startDistributionEngine) { + LOGGER.info("The disribution engine is disabled"); - return Either.left(value); - } + this.distributionEngineClusterHealth.setHealthCheckUebIsDisabled(); + + return; + } + + boolean isValidConfig = validateConfiguration(distributionEngineConfiguration); + + if (!isValidConfig) { + BeEcompErrorManager.getInstance().logBeUebSystemError(DistributionEngineInitTask.INIT_DISTRIBUTION_ENGINE_FLOW, "validate distribution configuration in init phase"); + + this.distributionEngineClusterHealth.setHealthCheckUebConfigurationError(); + return; + } + + List<String> environments = distributionEngineConfiguration.getEnvironments(); + + for (String envName : environments) { + LOGGER.debug("init task for environment {}", envName); + AtomicBoolean status = new AtomicBoolean(false); + envNamePerStatus.put(envName, status); + environmentsEngine.connectUebTopicForDistributionConfTopic(envName, status, envNamePerInitTask, envNamePerPollingTask); + } + + LOGGER.debug("init UEB health check"); + distributionEngineClusterHealth.startHealthCheckTask(envNamePerStatus); + + LOGGER.trace("Exit init method of DistributionEngine"); + + } + + @PreDestroy + public void shutdown() { + LOGGER.info("distribution engine shutdown - start"); + if (envNamePerInitTask != null) { + for (DistributionEngineInitTask task : envNamePerInitTask.values()) { + task.destroy(); + } + } + if (envNamePerPollingTask != null) { + for (DistributionEnginePollingTask task : envNamePerPollingTask.values()) { + task.destroy(); + } + } + + } + + /** + * validate mandatory configuration parameters received + * + * @param deConfiguration + * @return + */ + protected boolean validateConfiguration(DistributionEngineConfiguration deConfiguration) { + + String methodName = "validateConfiguration"; + + boolean result = true; + result = isValidServers(deConfiguration.getUebServers(), methodName, "uebServers") && result; + result = isValidParam(deConfiguration.getEnvironments(), methodName, "environments") && result; + result = isValidParam(deConfiguration.getUebPublicKey(), methodName, "uebPublicKey") && result; + result = isValidParam(deConfiguration.getUebSecretKey(), methodName, "uebSecretKey") && result; + result = isValidParam(deConfiguration.getDistributionNotifTopicName(), methodName, "distributionNotifTopicName") && result; + result = isValidParam(deConfiguration.getDistributionStatusTopicName(), methodName, "distributionStatusTopicName") && result; + result = isValidObject(deConfiguration.getCreateTopic(), methodName, "createTopic") && result; + result = isValidObject(deConfiguration.getDistributionStatusTopic(), methodName, "distributionStatusTopic") && result; + result = isValidObject(deConfiguration.getInitMaxIntervalSec(), methodName, "initMaxIntervalSec") && result; + result = isValidObject(deConfiguration.getInitRetryIntervalSec(), methodName, "initRetryIntervalSec") && result; + result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerId(), methodName, "consumerId") && result; + result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerGroup(), methodName, "consumerGroup") && result; + result = isValidObject(deConfiguration.getDistributionStatusTopic().getFetchTimeSec(), methodName, "fetchTimeSec") && result; + result = isValidObject(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec(), methodName, "pollingIntervalSec") && result; + + return result; + } + + private boolean isValidServers(List<String> uebServers, String methodName, String paramName) { + + if (uebServers == null || uebServers.isEmpty()) { + BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName); + return false; + } + + if (uebServers.size() < 2) { + BeEcompErrorManager.getInstance().logBeConfigurationInvalidListSizeError(methodName, paramName, 2); + return false; + } + + for (String serverFqdn : uebServers) { + if (!isValidFqdn(serverFqdn)) { + BeEcompErrorManager.getInstance().logBeInvalidConfigurationError(methodName, paramName, serverFqdn); + return false; + } + } + + return true; + } + + private boolean isValidFqdn(String serverFqdn) { + + try { + Matcher matcher = FQDN_PATTERN.matcher(serverFqdn); + return matcher.matches(); + + } catch (Exception e) { + LOGGER.debug("Failed to match value of address {}", serverFqdn, e); + return false; + } + } + + private boolean isValidParam(String paramValue, String methodName, String paramName) { + + if (StringUtils.isEmpty(paramValue)) { + BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName); + return false; + } + return true; + } + + private boolean isValidParam(List<String> paramValue, String methodName, String paramName) { + + if (CollectionUtils.isEmpty(paramValue)) { + BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName); + return false; + } + return true; + } + + private boolean isValidObject(Object paramValue, String methodName, String paramName) { + + if (paramValue == null) { + BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName); + return false; + } + return true; + + } + + private String getEnvironmentErrorDescription(StorageOperationStatus status) { + + switch (status) { + case DISTR_ENVIRONMENT_NOT_AVAILABLE: + return "environment is unavailable"; + case DISTR_ENVIRONMENT_NOT_FOUND: + return "environment is not configured in our system"; + case DISTR_ENVIRONMENT_SENT_IS_INVALID: + return "environment name is invalid"; + default: + return "unkhown"; + } + } + + @Override + public StorageOperationStatus isEnvironmentAvailable(String envName) { + + if (envName == null || envName.isEmpty()) { + + return StorageOperationStatus.DISTR_ENVIRONMENT_SENT_IS_INVALID; + } + + AtomicBoolean status = envNamePerStatus.get(envName); + if (status == null) { + return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_FOUND; + } + + if (!status.get()) { + return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_AVAILABLE; + } + return StorageOperationStatus.OK; + } + + @Override + public StorageOperationStatus isEnvironmentAvailable() { + + String envName = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getEnvironments().get(0); + + return isEnvironmentAvailable(envName); + } + + @Override + public void disableEnvironment(String envName) { + AtomicBoolean status = envNamePerStatus.get(envName); + status.set(false); + } + + @Override + public ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envName, String userId, String modifierName) { + return notifyService(distributionId, service, notificationData, envName, envName, userId, modifierName); + } + + @Override + public ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envId, String envName, String userId, String modifierName) { + LOGGER.debug("Received notify service request. distributionId = {}, serviceUuid = {} serviceUid = {}, envName = {}, userId = {}, modifierName {}", distributionId, service.getUUID(), service.getUniqueId(), envName, userId, modifierName); + String topicName = buildTopicName(envName); + ActionStatus notifyServiceStatus = Optional.ofNullable(environmentsEngine.getEnvironmentById(envId)) + .map(EnvironmentMessageBusData::new) + .map(messageBusData -> distributionNotificationSender.sendNotification(topicName, distributionId, messageBusData, notificationData, service, userId, modifierName)) + .orElse(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE); + LOGGER.debug("Finish notifyService. status is {}", notifyServiceStatus); + return notifyServiceStatus; + } + + private String buildTopicName(String envName) { + DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); + String distributionNotifTopicName = deConfiguration.getDistributionNotifTopicName(); + return DistributionEngineInitTask.buildTopicName(distributionNotifTopicName, envName); + } + + @Override + public StorageOperationStatus isReadyForDistribution(Service service, String envName) { + StorageOperationStatus status = isEnvironmentAvailable(envName); + if (status != StorageOperationStatus.OK) { + String envErrorDec = getEnvironmentErrorDescription(status); + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(DistributionNotificationSender.DISTRIBUTION_NOTIFICATION_SENDING, "Environment name " + envName + " is not available. Reason : " + envErrorDec); + return status; + } + + return verifyServiceHasDeploymentArtifacts(service); + } + + @Override + public StorageOperationStatus verifyServiceHasDeploymentArtifacts(Service service) { + if (!serviceDistributionArtifactsBuilder.verifyServiceContainsDeploymentArtifacts(service)) { + return StorageOperationStatus.DISTR_ARTIFACT_NOT_FOUND; + } + return StorageOperationStatus.OK; + } + + @Override + public OperationalEnvironmentEntry getEnvironmentById(String opEnvId) { + return environmentsEngine.getEnvironmentById(opEnvId); + } + + @Override + public INotificationData buildServiceForDistribution(Service service, String distributionId, String workloadContext) { + INotificationData value = serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(service, distributionId, workloadContext); + value = serviceDistributionArtifactsBuilder.buildServiceForDistribution(value, service); + return value; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineClusterHealth.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineClusterHealth.java index 85a868f156..9599879006 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineClusterHealth.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineClusterHealth.java @@ -20,22 +20,6 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - import org.openecomp.sdc.be.config.BeEcompErrorManager; import org.openecomp.sdc.be.config.ConfigurationManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; @@ -47,311 +31,318 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + @Component("distribution-engine-cluster-health") public class DistributionEngineClusterHealth { - protected static String UEB_HEALTH_LOG_CONTEXT = "ueb.healthcheck"; + protected static String UEB_HEALTH_LOG_CONTEXT = "ueb.healthcheck"; - private static Logger healthLogger = LoggerFactory.getLogger(UEB_HEALTH_LOG_CONTEXT); + private static final Logger healthLogger = LoggerFactory.getLogger(UEB_HEALTH_LOG_CONTEXT); - private static final String UEB_HEALTH_CHECK_STR = "uebHealthCheck"; + private static final String UEB_HEALTH_CHECK_STR = "uebHealthCheck"; - boolean lastHealthState = false; + boolean lastHealthState = false; - Object lockOject = new Object(); + Object lockOject = new Object(); - private long reconnectInterval = 5; + private long reconnectInterval = 5; - private long healthCheckReadTimeout = 20; + private long healthCheckReadTimeout = 20; - private static Logger logger = LoggerFactory.getLogger(DistributionEngineClusterHealth.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(DistributionEngineClusterHealth.class); - private List<String> uebServers = null; + private List<String> uebServers = null; - private String publicApiKey = null; + private String publicApiKey = null; - public enum HealthCheckInfoResult { + public enum HealthCheckInfoResult { - OK(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.UP, null, ClusterStatusDescription.OK.getDescription())), - UNAVAILABLE(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.UNAVAILABLE.getDescription())), - NOT_CONFIGURED(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.NOT_CONFIGURED.getDescription())), - DISABLED(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.DISABLED.getDescription())); + OK(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.UP, null, ClusterStatusDescription.OK.getDescription())), + UNAVAILABLE(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.UNAVAILABLE.getDescription())), + NOT_CONFIGURED(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.NOT_CONFIGURED.getDescription())), + DISABLED(new HealthCheckInfo(Constants.HC_COMPONENT_DISTRIBUTION_ENGINE, HealthCheckStatus.DOWN, null, ClusterStatusDescription.DISABLED.getDescription())); - private HealthCheckInfo healthCheckInfo; + private HealthCheckInfo healthCheckInfo; - HealthCheckInfoResult(HealthCheckInfo healthCheckInfo) { - this.healthCheckInfo = healthCheckInfo; - } + HealthCheckInfoResult(HealthCheckInfo healthCheckInfo) { + this.healthCheckInfo = healthCheckInfo; + } - public HealthCheckInfo getHealthCheckInfo() { - return healthCheckInfo; - } + public HealthCheckInfo getHealthCheckInfo() { + return healthCheckInfo; + } - } + } - private HealthCheckInfo healthCheckInfo = HealthCheckInfoResult.UNAVAILABLE.getHealthCheckInfo(); + private HealthCheckInfo healthCheckInfo = HealthCheckInfoResult.UNAVAILABLE.getHealthCheckInfo(); - private Map<String, AtomicBoolean> envNamePerStatus = null; + private Map<String, AtomicBoolean> envNamePerStatus = null; - private ScheduledFuture<?> scheduledFuture = null; + private ScheduledFuture<?> scheduledFuture = null; - ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "UEB-Health-Check-Task"); - } - }); + ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "UEB-Health-Check-Task"); + } + }); - HealthCheckScheduledTask healthCheckScheduledTask = null; + HealthCheckScheduledTask healthCheckScheduledTask = null; - public enum ClusterStatusDescription { + public enum ClusterStatusDescription { - OK("OK"), UNAVAILABLE("U-EB cluster is not available"), NOT_CONFIGURED("U-EB cluster is not configured"), DISABLED("DE is disabled in configuration"); + OK("OK"), UNAVAILABLE("U-EB cluster is not available"), NOT_CONFIGURED("U-EB cluster is not configured"), DISABLED("DE is disabled in configuration"); - private String desc; + private String desc; - ClusterStatusDescription(String desc) { - this.desc = desc; - } + ClusterStatusDescription(String desc) { + this.desc = desc; + } - public String getDescription() { - return desc; - } + public String getDescription() { + return desc; + } - } + } - /** - * Health Check Task Scheduler. - * - * It schedules a task which send a apiKey get query towards the UEB servers. In case a query to the first UEB server is failed, then a second query is sent to the next UEB server. - * - * - * @author esofer - * - */ - public class HealthCheckScheduledTask implements Runnable { + /** + * Health Check Task Scheduler. + * + * It schedules a task which send a apiKey get query towards the UEB servers. In case a query to the first UEB server is failed, then a second query is sent to the next UEB server. + * + * + * @author esofer + * + */ + public class HealthCheckScheduledTask implements Runnable { - List<UebHealthCheckCall> healthCheckCalls = new ArrayList<>(); + List<UebHealthCheckCall> healthCheckCalls = new ArrayList<>(); - public HealthCheckScheduledTask(List<String> uebServers) { + public HealthCheckScheduledTask(List<String> uebServers) { - logger.debug("Create health check calls for servers {}", uebServers); - if (uebServers != null) { - for (String server : uebServers) { - healthCheckCalls.add(new UebHealthCheckCall(server, publicApiKey)); - } - } - } + logger.debug("Create health check calls for servers {}", uebServers); + if (uebServers != null) { + for (String server : uebServers) { + healthCheckCalls.add(new UebHealthCheckCall(server, publicApiKey)); + } + } + } - @Override - public void run() { + @Override + public void run() { - healthLogger.trace("Executing UEB Health Check Task - Start"); + healthLogger.trace("Executing UEB Health Check Task - Start"); - boolean healthStatus = verifyAtLeastOneEnvIsUp(); + boolean healthStatus = verifyAtLeastOneEnvIsUp(); - if (true == healthStatus) { - boolean queryUebStatus = queryUeb(); - if (queryUebStatus == lastHealthState) { - return; - } + if (true == healthStatus) { + boolean queryUebStatus = queryUeb(); + if (queryUebStatus == lastHealthState) { + return; + } - synchronized (lockOject) { - if (queryUebStatus != lastHealthState) { - logger.trace("UEB Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus); - lastHealthState = queryUebStatus; - logAlarm(lastHealthState); - if (true == queryUebStatus) { - healthCheckInfo = HealthCheckInfoResult.OK.getHealthCheckInfo(); - } else { - healthCheckInfo = HealthCheckInfoResult.UNAVAILABLE.getHealthCheckInfo(); - } - } - } - } else { - healthLogger.trace("Not all UEB Environments are up"); - } - - } - - /** - * verify that at least one environment is up. - * - */ - private boolean verifyAtLeastOneEnvIsUp() { - - boolean healthStatus = false; + synchronized (lockOject) { + if (queryUebStatus != lastHealthState) { + logger.trace("UEB Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus); + lastHealthState = queryUebStatus; + logAlarm(lastHealthState); + if (true == queryUebStatus) { + healthCheckInfo = HealthCheckInfoResult.OK.getHealthCheckInfo(); + } else { + healthCheckInfo = HealthCheckInfoResult.UNAVAILABLE.getHealthCheckInfo(); + } + } + } + } else { + healthLogger.trace("Not all UEB Environments are up"); + } + + } + + /** + * verify that at least one environment is up. + * + */ + private boolean verifyAtLeastOneEnvIsUp() { + + boolean healthStatus = false; - if (envNamePerStatus != null) { - Collection<AtomicBoolean> values = envNamePerStatus.values(); - if (values != null) { - for (AtomicBoolean status : values) { - if (true == status.get()) { - healthStatus = true; - break; - } - } - } - } - - return healthStatus; - } - - /** - * executor for the query itself - */ - ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "UEB-Health-Check-Thread"); - } - }); - - /** - * go all UEB servers and send a get apiKeys query. In case a query is succeed, no query is sent to the rest of UEB servers. - * - * - * @return - */ - private boolean queryUeb() { - - Boolean result = false; - int retryNumber = 1; - for (UebHealthCheckCall healthCheckCall : healthCheckCalls) { - try { - - healthLogger.debug("Before running Health Check retry query number {} towards UEB server {}", retryNumber, healthCheckCall.getServer()); - - Future<Boolean> future = healthCheckExecutor.submit(healthCheckCall); - result = future.get(healthCheckReadTimeout, TimeUnit.SECONDS); - - healthLogger.debug("After running Health Check retry query number {} towards UEB server {}. Result is {}", retryNumber, healthCheckCall.getServer(), result); - - if (result != null && true == result.booleanValue()) { - break; - } - - } catch (Exception e) { - String message = e.getMessage(); - if (message == null) { - message = e.getClass().getName(); - } - healthLogger.debug("Error occured during running Health Check retry query towards UEB server {}. Result is {}", healthCheckCall.getServer(), message); - healthLogger.trace("Error occured during running Health Check retry query towards UEB server {}. Result is {}", healthCheckCall.getServer(), message, e); - } - retryNumber++; - - } - - return result; - - } - - public List<UebHealthCheckCall> getHealthCheckCalls() { - return healthCheckCalls; - } - - } - - @PostConstruct - private void init() { - - logger.trace("Enter init method of DistributionEngineClusterHealth"); - - Long reconnectIntervalConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getUebHealthCheckReconnectIntervalInSeconds(); - if (reconnectIntervalConfig != null) { - reconnectInterval = reconnectIntervalConfig.longValue(); - } - Long healthCheckReadTimeoutConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getUebHealthCheckReadTimeout(); - if (healthCheckReadTimeoutConfig != null) { - healthCheckReadTimeout = healthCheckReadTimeoutConfig.longValue(); - } - - DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); - - this.uebServers = distributionEngineConfiguration.getUebServers(); - this.publicApiKey = distributionEngineConfiguration.getUebPublicKey(); - - this.healthCheckScheduledTask = new HealthCheckScheduledTask(this.uebServers); - - logger.trace("Exit init method of DistributionEngineClusterHealth"); - - } - - @PreDestroy - private void destroy() { - - if (scheduledFuture != null) { - scheduledFuture.cancel(true); - scheduledFuture = null; - } - - if (healthCheckScheduler != null) { - healthCheckScheduler.shutdown(); - } - - } - - /** - * Start health check task. - * - * @param envNamePerStatus - * @param startTask - */ - public void startHealthCheckTask(Map<String, AtomicBoolean> envNamePerStatus, boolean startTask) { - this.envNamePerStatus = envNamePerStatus; - - if (startTask == true && this.scheduledFuture == null) { - this.scheduledFuture = this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS); - } - } - - public void startHealthCheckTask(Map<String, AtomicBoolean> envNamePerStatus) { - startHealthCheckTask(envNamePerStatus, true); - } - - private void logAlarm(boolean lastHealthState) { - if (lastHealthState == true) { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeHealthCheckRecovery, UEB_HEALTH_CHECK_STR); - BeEcompErrorManager.getInstance().logBeHealthCheckUebClusterRecovery(UEB_HEALTH_CHECK_STR); - } else { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeHealthCheckError, UEB_HEALTH_CHECK_STR); - BeEcompErrorManager.getInstance().logBeHealthCheckUebClusterError(UEB_HEALTH_CHECK_STR); - } - } - - public HealthCheckInfo getHealthCheckInfo() { - return healthCheckInfo; - } - - /** - * change the health check to DISABLE - */ - public void setHealthCheckUebIsDisabled() { - healthCheckInfo = HealthCheckInfoResult.DISABLED.getHealthCheckInfo(); - } - - /** - * change the health check to NOT CONFGIURED - */ - public void setHealthCheckUebConfigurationError() { - healthCheckInfo = HealthCheckInfoResult.NOT_CONFIGURED.getHealthCheckInfo(); - } - - public void setHealthCheckOkAndReportInCaseLastStateIsDown() { - - if (lastHealthState == true) { - return; - } - synchronized (lockOject) { - if (lastHealthState == false) { - logger.debug("Going to update health check state to available"); - lastHealthState = true; - healthCheckInfo = HealthCheckInfoResult.OK.getHealthCheckInfo(); - logAlarm(lastHealthState); - } - } - - } + if (envNamePerStatus != null) { + Collection<AtomicBoolean> values = envNamePerStatus.values(); + if (values != null) { + for (AtomicBoolean status : values) { + if (true == status.get()) { + healthStatus = true; + break; + } + } + } + } + + return healthStatus; + } + + /** + * executor for the query itself + */ + ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "UEB-Health-Check-Thread"); + } + }); + + /** + * go all UEB servers and send a get apiKeys query. In case a query is succeed, no query is sent to the rest of UEB servers. + * + * + * @return + */ + private boolean queryUeb() { + + Boolean result = false; + int retryNumber = 1; + for (UebHealthCheckCall healthCheckCall : healthCheckCalls) { + try { + + healthLogger.debug("Before running Health Check retry query number {} towards UEB server {}", retryNumber, healthCheckCall.getServer()); + + Future<Boolean> future = healthCheckExecutor.submit(healthCheckCall); + result = future.get(healthCheckReadTimeout, TimeUnit.SECONDS); + + healthLogger.debug("After running Health Check retry query number {} towards UEB server {}. Result is {}", retryNumber, healthCheckCall.getServer(), result); + + if (result != null && true == result.booleanValue()) { + break; + } + + } catch (Exception e) { + String message = e.getMessage(); + if (message == null) { + message = e.getClass().getName(); + } + healthLogger.debug("Error occured during running Health Check retry query towards UEB server {}. Result is {}", healthCheckCall.getServer(), message); + healthLogger.trace("Error occured during running Health Check retry query towards UEB server {}. Result is {}", healthCheckCall.getServer(), message, e); + } + retryNumber++; + + } + + return result; + + } + + public List<UebHealthCheckCall> getHealthCheckCalls() { + return healthCheckCalls; + } + + } + + @PostConstruct + protected void init() { + + logger.trace("Enter init method of DistributionEngineClusterHealth"); + + Long reconnectIntervalConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getUebHealthCheckReconnectIntervalInSeconds(); + if (reconnectIntervalConfig != null) { + reconnectInterval = reconnectIntervalConfig.longValue(); + } + Long healthCheckReadTimeoutConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getUebHealthCheckReadTimeout(); + if (healthCheckReadTimeoutConfig != null) { + healthCheckReadTimeout = healthCheckReadTimeoutConfig.longValue(); + } + + DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); + + this.uebServers = distributionEngineConfiguration.getUebServers(); + this.publicApiKey = distributionEngineConfiguration.getUebPublicKey(); + + this.healthCheckScheduledTask = new HealthCheckScheduledTask(this.uebServers); + + logger.trace("Exit init method of DistributionEngineClusterHealth"); + + } + + @PreDestroy + protected void destroy() { + + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + scheduledFuture = null; + } + + if (healthCheckScheduler != null) { + healthCheckScheduler.shutdown(); + } + + } + + /** + * Start health check task. + * + * @param envNamePerStatus + * @param startTask + */ + public void startHealthCheckTask(Map<String, AtomicBoolean> envNamePerStatus, boolean startTask) { + this.envNamePerStatus = envNamePerStatus; + + if (startTask == true && this.scheduledFuture == null) { + this.scheduledFuture = this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS); + } + } + + public void startHealthCheckTask(Map<String, AtomicBoolean> envNamePerStatus) { + startHealthCheckTask(envNamePerStatus, true); + } + + private void logAlarm(boolean lastHealthState) { + if (lastHealthState == true) { + BeEcompErrorManager.getInstance().logBeHealthCheckUebClusterRecovery(UEB_HEALTH_CHECK_STR); + } else { + BeEcompErrorManager.getInstance().logBeHealthCheckUebClusterError(UEB_HEALTH_CHECK_STR); + } + } + + public HealthCheckInfo getHealthCheckInfo() { + return healthCheckInfo; + } + + /** + * change the health check to DISABLE + */ + public void setHealthCheckUebIsDisabled() { + healthCheckInfo = HealthCheckInfoResult.DISABLED.getHealthCheckInfo(); + } + + /** + * change the health check to NOT CONFGIURED + */ + public void setHealthCheckUebConfigurationError() { + healthCheckInfo = HealthCheckInfoResult.NOT_CONFIGURED.getHealthCheckInfo(); + } + + public void setHealthCheckOkAndReportInCaseLastStateIsDown() { + + if (lastHealthState == true) { + return; + } + synchronized (lockOject) { + if (lastHealthState == false) { + logger.debug("Going to update health check state to available"); + lastHealthState = true; + healthCheckInfo = HealthCheckInfoResult.OK.getHealthCheckInfo(); + logAlarm(lastHealthState); + } + } + + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java index 1eeaa1229e..1759f69b3e 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java @@ -20,274 +20,268 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - +import fj.data.Either; import org.openecomp.sdc.be.config.BeEcompErrorManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; import org.openecomp.sdc.be.impl.ComponentsUtils; +import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum; -import org.openecomp.sdc.common.config.EcompErrorName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import fj.data.Either; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; public class DistributionEngineInitTask implements Runnable { - public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine"; - public static final String ALREADY_EXISTS = "ALREADY_EXISTS"; - public static final String CONSUMER = "CONSUMER"; - public static final String PRODUCER = "PRODUCER"; - public static final String CREATED = "CREATED"; - public static final String FAILED = "FAILED"; - public static final Integer HTTP_OK = 200; - - private Long delayBeforeStartFlow = 0l; - private DistributionEngineConfiguration deConfiguration; - private String envName; - private long retryInterval; - private long currentRetryInterval; - private long maxInterval; - // private boolean active = false; - boolean maximumRetryInterval = false; - private AtomicBoolean status = null; - ComponentsUtils componentsUtils = null; - DistributionEnginePollingTask distributionEnginePollingTask = null; - - private CambriaHandler cambriaHandler = new CambriaHandler(); - - public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask) { - super(); - this.delayBeforeStartFlow = delayBeforeStartFlow; - this.deConfiguration = deConfiguration; - this.envName = envName; - this.retryInterval = deConfiguration.getInitRetryIntervalSec(); - this.currentRetryInterval = retryInterval; - this.maxInterval = deConfiguration.getInitMaxIntervalSec(); - this.status = status; - this.componentsUtils = componentsUtils; - this.distributionEnginePollingTask = distributionEnginePollingTask; - } - - private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); - - private static Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class.getName()); - - ScheduledFuture<?> scheduledFuture = null; - - public void startTask() { - if (scheduledExecutorService != null) { - Integer retryInterval = deConfiguration.getInitRetryIntervalSec(); - logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow); - this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS); - - } - } - - public void restartTask() { - - this.stopTask(); - - logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval); - - long lastCurrentInterval = currentRetryInterval; - incrementRetryInterval(); - - this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS); - - } - - protected void incrementRetryInterval() { - if (currentRetryInterval < maxInterval) { - currentRetryInterval *= 2; - if (currentRetryInterval > maxInterval) { - setMaxRetryInterval(); - } - } else { - setMaxRetryInterval(); - } - } - - private void setMaxRetryInterval() { - currentRetryInterval = maxInterval; - maximumRetryInterval = true; - logger.debug("Set next retry init interval to {}", maxInterval); - } - - public void stopTask() { - if (scheduledFuture != null) { - boolean result = scheduledFuture.cancel(true); - logger.debug("Stop reinit task. result = {}", result); - if (false == result) { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task"); - BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task"); - } - scheduledFuture = null; - } - } - - public void destroy() { - this.stopTask(); - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdown(); - } - } - - @Override - public void run() { - - boolean result = false; - result = initFlow(); - - if (true == result) { - this.stopTask(); - this.status.set(true); - if (this.distributionEnginePollingTask != null) { - String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName); - logger.debug("start polling distribution status topic {}", topicName); - this.distributionEnginePollingTask.startTask(topicName); - } - } else { - if (false == maximumRetryInterval) { - this.restartTask(); - } - } - } - - /** - * run initialization flow - * - * @return - */ - public boolean initFlow() { - - logger.trace("Start init flow for environment {}", this.envName); - - Set<String> topicsList = null; - Either<Set<String>, CambriaErrorResponse> getTopicsRes = null; - - getTopicsRes = cambriaHandler.getTopics(deConfiguration.getUebServers()); - if (getTopicsRes.isRight()) { - CambriaErrorResponse status = getTopicsRes.right().value(); - if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) { - topicsList = new HashSet<>(); - } else { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server"); - - BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server"); - - return false; - } - } else { - topicsList = getTopicsRes.left().value(); - } - - String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName); - logger.debug("Going to handle topic {}", notificationTopic); - - boolean status = createTopicIfNotExists(topicsList, notificationTopic); - if (false == status) { - return false; - } - - CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER); - - CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus(); - - if (createStatus != CambriaOperationStatus.OK) { - return false; - } - - String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName); - logger.debug("Going to handle topic {}", statusTopic); - status = createTopicIfNotExists(topicsList, statusTopic); - if (false == status) { - return false; - } - - CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER); - - if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) { - return false; - } - - return true; - } - - private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) { - CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(deConfiguration.getUebServers(), topicName, deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), deConfiguration.getUebPublicKey(), subscriberType); - - String role = CONSUMER; - if (subscriberType == SubscriberTypeEnum.PRODUCER) { - role = PRODUCER; - } - auditRegistration(topicName, registerStatus, role); - return registerStatus; - } - - private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) { - if (componentsUtils != null) { - Integer httpCode = registerProducerStatus.getHttpCode(); - String httpCodeStr = String.valueOf(httpCode); - this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, deConfiguration.getUebPublicKey(), httpCodeStr); - } - } - - private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) { - - if (topicsList.contains(topicName)) { - if (componentsUtils != null) { - this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS); - } - return true; - } - - CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(deConfiguration.getUebServers(), deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(), - deConfiguration.getCreateTopic().getReplicationCount()); - - CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus(); - if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) { - if (componentsUtils != null) { - this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS); - } - } else if (status == CambriaOperationStatus.OK) { - if (componentsUtils != null) { - this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED); - } - } else { - if (componentsUtils != null) { - this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED); - } - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName); - - BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName); - - return false; - } - - return true; - } - - public static String buildTopicName(String topicName, String environment) { - return topicName + "-" + environment.toUpperCase(); - } - - public boolean isActive() { - return this.status.get(); - } - - public long getCurrentRetryInterval() { - return currentRetryInterval; - } - - protected void setCambriaHandler(CambriaHandler cambriaHandler) { - this.cambriaHandler = cambriaHandler; - } + public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine"; + public static final String ALREADY_EXISTS = "ALREADY_EXISTS"; + public static final String CONSUMER = "CONSUMER"; + public static final String PRODUCER = "PRODUCER"; + public static final String CREATED = "CREATED"; + public static final String FAILED = "FAILED"; + public static final Integer HTTP_OK = 200; + + private Long delayBeforeStartFlow = 0l; + private DistributionEngineConfiguration deConfiguration; + private String envName; + private long retryInterval; + private long currentRetryInterval; + private long maxInterval; + boolean maximumRetryInterval = false; + private AtomicBoolean status = null; + ComponentsUtils componentsUtils = null; + DistributionEnginePollingTask distributionEnginePollingTask = null; + private OperationalEnvironmentEntry environmentEntry; + + private CambriaHandler cambriaHandler = new CambriaHandler(); + + public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) { + super(); + this.delayBeforeStartFlow = delayBeforeStartFlow; + this.deConfiguration = deConfiguration; + this.envName = envName; + this.retryInterval = deConfiguration.getInitRetryIntervalSec(); + this.currentRetryInterval = retryInterval; + this.maxInterval = deConfiguration.getInitMaxIntervalSec(); + this.status = status; + this.componentsUtils = componentsUtils; + this.distributionEnginePollingTask = distributionEnginePollingTask; + this.environmentEntry = environmentEntry; + } + + private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + + private static final Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class); + + ScheduledFuture<?> scheduledFuture = null; + + public void startTask() { + if (scheduledExecutorService != null) { + Integer retryInterval = deConfiguration.getInitRetryIntervalSec(); + logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow); + this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS); + + } + } + + public void restartTask() { + + this.stopTask(); + + logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval); + + long lastCurrentInterval = currentRetryInterval; + incrementRetryInterval(); + + this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS); + + } + + protected void incrementRetryInterval() { + if (currentRetryInterval < maxInterval) { + currentRetryInterval *= 2; + if (currentRetryInterval > maxInterval) { + setMaxRetryInterval(); + } + } else { + setMaxRetryInterval(); + } + } + + private void setMaxRetryInterval() { + currentRetryInterval = maxInterval; + maximumRetryInterval = true; + logger.debug("Set next retry init interval to {}", maxInterval); + } + + public void stopTask() { + if (scheduledFuture != null) { + boolean result = scheduledFuture.cancel(true); + logger.debug("Stop reinit task. result = {}", result); + if (false == result) { + BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task"); + } + scheduledFuture = null; + } + } + + public void destroy() { + this.stopTask(); + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Override + public void run() { + + boolean result = false; + result = initFlow(); + + if (true == result) { + this.stopTask(); + this.status.set(true); + if (this.distributionEnginePollingTask != null) { + String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName); + logger.debug("start polling distribution status topic {}", topicName); + this.distributionEnginePollingTask.startTask(topicName); + } + } else { + if (false == maximumRetryInterval) { + this.restartTask(); + } + } + } + + /** + * run initialization flow + * + * @return + */ + public boolean initFlow() { + + logger.trace("Start init flow for environment {}", this.envName); + + Set<String> topicsList = null; + Either<Set<String>, CambriaErrorResponse> getTopicsRes = null; + + getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList())); + if (getTopicsRes.isRight()) { + CambriaErrorResponse status = getTopicsRes.right().value(); + if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) { + topicsList = new HashSet<>(); + } else { + BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server"); + return false; + } + } else { + topicsList = getTopicsRes.left().value(); + } + + String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName); + logger.debug("Going to handle topic {}", notificationTopic); + + boolean status = createTopicIfNotExists(topicsList, notificationTopic); + if (false == status) { + return false; + } + + CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER); + + CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus(); + + if (createStatus != CambriaOperationStatus.OK) { + return false; + } + + String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName); + logger.debug("Going to handle topic {}", statusTopic); + status = createTopicIfNotExists(topicsList, statusTopic); + if (false == status) { + return false; + } + + CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER); + + if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) { + return false; + } + + return true; + } + + private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) { + CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), environmentEntry.getUebApikey(), subscriberType, topicName); + + String role = CONSUMER; + if (subscriberType == SubscriberTypeEnum.PRODUCER) { + role = PRODUCER; + } + auditRegistration(topicName, registerStatus, role); + return registerStatus; + } + + private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) { + if (componentsUtils != null) { + Integer httpCode = registerProducerStatus.getHttpCode(); + String httpCodeStr = String.valueOf(httpCode); + this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, environmentEntry.getUebApikey(), httpCodeStr); + } + } + + private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) { + + if (topicsList.contains(topicName)) { + if (componentsUtils != null) { + this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS); + } + return true; + } + + CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(), + deConfiguration.getCreateTopic().getReplicationCount()); + + CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus(); + if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) { + if (componentsUtils != null) { + this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS); + } + } else if (status == CambriaOperationStatus.OK) { + if (componentsUtils != null) { + this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED); + } + } else { + if (componentsUtils != null) { + this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED); + } + BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName); + return false; + } + + return true; + } + + public static String buildTopicName(String topicName, String environment) { + return topicName + "-" + environment.toUpperCase(); + } + + public boolean isActive() { + return this.status.get(); + } + + public long getCurrentRetryInterval() { + return currentRetryInterval; + } + + protected void setCambriaHandler(CambriaHandler cambriaHandler) { + this.cambriaHandler = cambriaHandler; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java index fc7c473d6b..b4f4863284 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java @@ -20,188 +20,189 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - +import com.att.nsa.cambria.client.CambriaConsumer; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import fj.data.Either; import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter; import org.openecomp.sdc.be.config.BeEcompErrorManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig; import org.openecomp.sdc.be.impl.ComponentsUtils; +import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum; -import org.openecomp.sdc.common.config.EcompErrorName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - -import fj.data.Either; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; public class DistributionEnginePollingTask implements Runnable { - public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling"; - - private String topicName; - private ComponentsUtils componentUtils; - private int fetchTimeoutInSec = 15; - private int pollingIntervalInSec; - private String consumerId; - private String consumerGroup; - private DistributionEngineConfiguration distributionEngineConfiguration; - - private CambriaHandler cambriaHandler = new CambriaHandler(); - private Gson gson = new GsonBuilder().setPrettyPrinting().create(); - - private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build()); - - private static Logger logger = LoggerFactory.getLogger(DistributionEnginePollingTask.class.getName()); - - ScheduledFuture<?> scheduledFuture = null; - private CambriaConsumer cambriaConsumer = null; - - private DistributionEngineClusterHealth distributionEngineClusterHealth = null; - - public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, String envName, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth) { - - this.componentUtils = componentUtils; - this.distributionEngineConfiguration = distributionEngineConfiguration; - DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic(); - this.pollingIntervalInSec = statusConfig.getPollingIntervalSec(); - this.fetchTimeoutInSec = statusConfig.getFetchTimeSec(); - this.consumerGroup = statusConfig.getConsumerGroup(); - this.consumerId = statusConfig.getConsumerId(); - this.distributionEngineClusterHealth = distributionEngineClusterHealth; - } - - public void startTask(String topicName) { - - this.topicName = topicName; - logger.debug("start task for polling topic {}", topicName); - if (fetchTimeoutInSec < 15) { - logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default"); - fetchTimeoutInSec = 15; - } - try { - cambriaConsumer = cambriaHandler.createConsumer(distributionEngineConfiguration.getUebServers(), topicName, distributionEngineConfiguration.getUebPublicKey(), distributionEngineConfiguration.getUebSecretKey(), consumerId, consumerGroup, - fetchTimeoutInSec * 1000); - - if (scheduledPollingService != null) { - logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec); - scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS); - - } - } catch (Exception e) { - logger.debug("unexpected error occured", e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage()); - BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage()); - } - } - - public void stopTask() { - if (scheduledFuture != null) { - boolean result = scheduledFuture.cancel(true); - logger.debug("Stop polling task. result = {}", result); - if (false == result) { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "try to stop the polling task"); - BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task"); - } - scheduledFuture = null; - } - if (cambriaConsumer != null) { - logger.debug("close consumer"); - cambriaHandler.closeConsumer(cambriaConsumer); - } - - } - - public void destroy() { - this.stopTask(); - shutdownExecutor(); - } - - @Override - public void run() { - logger.trace("run() method. polling queue {}", topicName); - - try { - // init error - if (cambriaConsumer == null) { - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly"); - BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly"); - stopTask(); - return; - } - - Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer); - // fetch error - if (fetchResult.isRight()) { - CambriaErrorResponse errorResponse = fetchResult.right().value(); - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + fetchResult.right().value()); - BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + fetchResult.right().value()); - - // TODO: if status== internal error (connection problem) change - // state to inactive - // in next try, if succeed - change to active - return; - } - - // success - Iterable<String> messages = fetchResult.left().value(); - for (String message : messages) { - logger.trace("received message {}", message); - try { - DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class); - componentUtils.auditDistributionStatusNotification(AuditingActionEnum.DISTRIBUTION_STATUS, notification.getDistributionID(), notification.getConsumerID(), topicName, notification.getArtifactURL(), - String.valueOf(notification.getTimestamp()), notification.getStatus().name(), notification.getErrorReason()); - - distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown(); - - } catch (Exception e) { - logger.debug("failed to convert message to object", e); - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value()); - BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value()); - } - - } - } catch (Exception e) { - logger.debug("unexpected error occured", e); - String methodName = new Object() { - }.getClass().getEnclosingMethod().getName(); - - BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage()); - BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage()); - } - - } - - private void shutdownExecutor() { - if (scheduledPollingService == null) - return; - - scheduledPollingService.shutdown(); // Disable new tasks from being - // submitted - try { - // Wait a while for existing tasks to terminate - if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) { - scheduledPollingService.shutdownNow(); // Cancel currently - // executing tasks - // Wait a while for tasks to respond to being cancelled - if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) - logger.debug("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - scheduledPollingService.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } + public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling"; + + private String topicName; + private ComponentsUtils componentUtils; + private int fetchTimeoutInSec = 15; + private int pollingIntervalInSec; + private String consumerId; + private String consumerGroup; + + private CambriaHandler cambriaHandler = new CambriaHandler(); + private Gson gson = new GsonBuilder().setPrettyPrinting().create(); + private DistributionCompleteReporter distributionCompleteReporter; + + private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build()); + + private static final Logger logger = LoggerFactory.getLogger(DistributionEnginePollingTask.class); + + ScheduledFuture<?> scheduledFuture = null; + private CambriaConsumer cambriaConsumer = null; + + private DistributionEngineClusterHealth distributionEngineClusterHealth = null; + + private OperationalEnvironmentEntry environmentEntry; + + public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth, OperationalEnvironmentEntry environmentEntry) { + + this.componentUtils = componentUtils; + DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic(); + this.pollingIntervalInSec = statusConfig.getPollingIntervalSec(); + this.fetchTimeoutInSec = statusConfig.getFetchTimeSec(); + this.consumerGroup = statusConfig.getConsumerGroup(); + this.consumerId = statusConfig.getConsumerId(); + this.distributionEngineClusterHealth = distributionEngineClusterHealth; + this.environmentEntry = environmentEntry; + this.distributionCompleteReporter = distributionCompleteReporter; + } + + public void startTask(String topicName) { + + this.topicName = topicName; + logger.debug("start task for polling topic {}", topicName); + if (fetchTimeoutInSec < 15) { + logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default"); + fetchTimeoutInSec = 15; + } + try { + cambriaConsumer = cambriaHandler.createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), consumerId, consumerGroup, + fetchTimeoutInSec * 1000); + + if (scheduledPollingService != null) { + logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec); + scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS); + + } + } catch (Exception e) { + logger.debug("unexpected error occured", e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage()); + } + } + + public void stopTask() { + if (scheduledFuture != null) { + boolean result = scheduledFuture.cancel(true); + logger.debug("Stop polling task. result = {}", result); + if (false == result) { + BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task"); + } + scheduledFuture = null; + } + if (cambriaConsumer != null) { + logger.debug("close consumer"); + cambriaHandler.closeConsumer(cambriaConsumer); + } + + } + + public void destroy() { + this.stopTask(); + shutdownExecutor(); + } + + @Override + public void run() { + logger.trace("run() method. polling queue {}", topicName); + + try { + // init error + if (cambriaConsumer == null) { + BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly"); + stopTask(); + return; + } + + Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer); + // fetch error + if (fetchResult.isRight()) { + CambriaErrorResponse errorResponse = fetchResult.right().value(); + BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + errorResponse); + + // TODO: if status== internal error (connection problem) change + // state to inactive + // in next try, if succeed - change to active + return; + } + + // success + Iterable<String> messages = fetchResult.left().value(); + for (String message : messages) { + logger.trace("received message {}", message); + try { + DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class); + handleDistributionNotificationMsg(notification); + distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown(); + } catch (Exception e) { + logger.debug("failed to convert message to object", e); + BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value()); + } + + } + } catch (Exception e) { + logger.debug("unexpected error occured", e); + String methodName = new Object() { + }.getClass().getEnclosingMethod().getName(); + + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage()); + } + + } + + private void handleDistributionNotificationMsg(DistributionStatusNotification notification) { + componentUtils.auditDistributionStatusNotification(AuditingActionEnum.DISTRIBUTION_STATUS, notification.getDistributionID(), notification.getConsumerID(), topicName, notification.getArtifactURL(), + String.valueOf(notification.getTimestamp()), notification.getStatus().name(), notification.getErrorReason()); + if (notification.isDistributionCompleteNotification()) { + distributionCompleteReporter.reportDistributionComplete(notification); + } + } + + private void shutdownExecutor() { + if (scheduledPollingService == null) + return; + + scheduledPollingService.shutdown(); // Disable new tasks from being + // submitted + try { + // Wait a while for existing tasks to terminate + if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) { + scheduledPollingService.shutdownNow(); // Cancel currently + // executing tasks + // Wait a while for tasks to respond to being cancelled + if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) + logger.debug("Pool did not terminate"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + scheduledPollingService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java index 16a0a1dc31..62af4b8514 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java @@ -20,20 +20,13 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; - -import javax.annotation.PreDestroy; - import org.openecomp.sdc.be.config.ConfigurationManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; -import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionNotificationTopicConfig; +import org.openecomp.sdc.be.dao.api.ActionStatus; +import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; import org.openecomp.sdc.be.impl.ComponentsUtils; import org.openecomp.sdc.be.model.Service; -import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus; -import org.openecomp.sdc.be.model.operations.impl.InterfaceLifecycleOperation; import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum; -import org.openecomp.sdc.common.util.ThreadLocalsHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -41,79 +34,83 @@ import org.springframework.stereotype.Component; @Component("distributionNotificationSender") public class DistributionNotificationSender { - protected static final String DISTRIBUTION_NOTIFICATION_SENDING = "distributionNotificationSending"; - - private static Logger logger = LoggerFactory.getLogger(DistributionNotificationSender.class.getName()); - - // final String BASE_ARTIFACT_URL = "/sdc/v1/catalog/services/%s/%s/"; - // final String RESOURCE_ARTIFACT_URL = BASE_ARTIFACT_URL - // + "resources/%s/%s/artifacts/%s"; - // final String SERVICE_ARTIFACT_URL = BASE_ARTIFACT_URL + "artifacts/%s"; - - @javax.annotation.Resource - InterfaceLifecycleOperation interfaceLifecycleOperation; - - @javax.annotation.Resource - protected ComponentsUtils componentUtils; - - ExecutorService executorService = null; - - CambriaHandler cambriaHandler = new CambriaHandler(); - - NotificationExecutorService notificationExecutorService = new NotificationExecutorService(); - - public DistributionNotificationSender() { - super(); - - DistributionNotificationTopicConfig distributionNotificationTopic = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getDistributionNotificationTopic(); - - executorService = notificationExecutorService.createExcecutorService(distributionNotificationTopic); - } - - @PreDestroy - public void shutdown() { - logger.debug("Going to close notificationExecutorService"); - if (executorService != null) { - - long maxWaitingTime = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds(); - - notificationExecutorService.shutdownAndAwaitTermination(executorService, maxWaitingTime + 1); - } - } - - public StorageOperationStatus sendNotification(String topicName, String distributionId, DistributionEngineConfiguration deConfiguration, String envName, INotificationData notificationData, Service service, String userId, String modifierName) { - - Runnable task = new PublishNotificationRunnable(envName, distributionId, service, notificationData, deConfiguration, topicName, userId, modifierName, cambriaHandler, componentUtils, ThreadLocalsHolder.getUuid()); - try { - executorService.submit(task); - } catch (RejectedExecutionException e) { - logger.warn("Failed to submit task. Number of threads exceeeds", e); - return StorageOperationStatus.OVERLOAD; - } - - return StorageOperationStatus.OK; - } - - /** - * Audit the publishing notification in case of internal server error. - * - * @param topicName - * @param status - * @param distributionId - * @param envName - */ - private void auditDistributionNotificationInternalServerError(String topicName, StorageOperationStatus status, String distributionId, String envName) { - - if (this.componentUtils != null) { - this.componentUtils.auditDistributionNotification(AuditingActionEnum.DISTRIBUTION_NOTIFY, "", " ", "Service", " ", " ", " ", envName, " ", topicName, distributionId, "Error: Internal Server Error. " + status, " "); - } - } - - protected CambriaErrorResponse publishNotification(INotificationData data, DistributionEngineConfiguration deConfiguration, String topicName) { - - CambriaErrorResponse status = cambriaHandler.sendNotification(topicName, deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), deConfiguration.getUebServers(), data); + protected static final String DISTRIBUTION_NOTIFICATION_SENDING = "distributionNotificationSending"; + + private static final Logger logger = LoggerFactory.getLogger(DistributionNotificationSender.class); + + @javax.annotation.Resource + protected ComponentsUtils componentUtils; + private CambriaHandler cambriaHandler = new CambriaHandler(); + private DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); + + public ActionStatus sendNotification(String topicName, String distributionId, EnvironmentMessageBusData messageBusData, INotificationData notificationData, Service service, String userId, String modifierName) { + long startTime = System.currentTimeMillis(); + CambriaErrorResponse status = cambriaHandler.sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(), messageBusData.getDmaaPuebEndpoints(), notificationData, + deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds()); + logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode()); + auditDistributionNotification(topicName, distributionId, status, service, messageBusData.getEnvId(), userId, modifierName, notificationData.getWorkloadContext(), messageBusData.getTenant()); + long endTime = System.currentTimeMillis(); + logger.debug("After building and publishing artifacts object. Total took {} milliseconds", (endTime - startTime)); + return convertCambriaResponse(status); + } + + private void auditDistributionNotification(String topicName, String distributionId, CambriaErrorResponse status, Service service, String envId, String userId, String modifierName, String workloadContext, String tenant) { + if (this.componentUtils != null) { + Integer httpCode = status.getHttpCode(); + String httpCodeStr = String.valueOf(httpCode); + + String desc = getDescriptionFromErrorResponse(status); + + this.componentUtils.auditDistributionNotification(AuditingActionEnum.DISTRIBUTION_NOTIFY, service.getUUID(), service.getName(), "Service", service.getVersion(), userId, modifierName, envId, service.getLifecycleState().name(), topicName, + distributionId, desc, httpCodeStr, workloadContext, tenant); + } + } + + private String getDescriptionFromErrorResponse(CambriaErrorResponse status) { + + CambriaOperationStatus operationStatus = status.getOperationStatus(); + + switch (operationStatus) { + case OK: + return "OK"; + case AUTHENTICATION_ERROR: + return "Error: Authentication problem towards U-EB server"; + case INTERNAL_SERVER_ERROR: + return "Error: Internal U-EB server error"; + case UNKNOWN_HOST_ERROR: + return "Error: Cannot reach U-EB server host"; + case CONNNECTION_ERROR: + return "Error: Cannot connect to U-EB server"; + case OBJECT_NOT_FOUND: + return "Error: object not found in U-EB server"; + default: + return "Error: Internal Cambria server problem"; + + } + + } + + private ActionStatus convertCambriaResponse(CambriaErrorResponse status) { + CambriaOperationStatus operationStatus = status.getOperationStatus(); + + switch (operationStatus) { + case OK: + return ActionStatus.OK; + case AUTHENTICATION_ERROR: + return ActionStatus.AUTHENTICATION_ERROR; + case INTERNAL_SERVER_ERROR: + return ActionStatus.GENERAL_ERROR; + case UNKNOWN_HOST_ERROR: + return ActionStatus.UNKNOWN_HOST; + case CONNNECTION_ERROR: + return ActionStatus.CONNNECTION_ERROR; + case OBJECT_NOT_FOUND: + return ActionStatus.OBJECT_NOT_FOUND; + default: + return ActionStatus.GENERAL_ERROR; + + } + } - return status; - } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotification.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotification.java index 73a0336361..006aa26082 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotification.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotification.java @@ -22,63 +22,67 @@ package org.openecomp.sdc.be.components.distribution.engine; public class DistributionStatusNotification { - String distributionID; - String consumerID; - long timestamp; - String artifactURL; - DistributionStatusNotificationEnum status; - String errorReason; - - public String getDistributionID() { - return distributionID; - } - - public void setDistributionID(String distributionId) { - this.distributionID = distributionId; - } - - public String getConsumerID() { - return consumerID; - } - - public void setConsumerID(String consumerId) { - this.consumerID = consumerId; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - public String getArtifactURL() { - return artifactURL; - } - - public void setArtifactURL(String artifactURL) { - this.artifactURL = artifactURL; - } - - public DistributionStatusNotificationEnum getStatus() { - return status; - } - - public void setStatus(DistributionStatusNotificationEnum status) { - this.status = status; - } - - public String getErrorReason() { - return errorReason; - } - - public void setErrorReason(String errorReason) { - this.errorReason = errorReason; - } - - @Override - public String toString() { - return "DistributionStatusNotification [distributionId=" + distributionID + ", consumerId=" + consumerID + ", timestamp=" + timestamp + ", artifactURL=" + artifactURL + ", status=" + status + ", errorReason=" + errorReason + "]"; - } + String distributionID; + String consumerID; + long timestamp; + String artifactURL; + DistributionStatusNotificationEnum status; + String errorReason; + + public String getDistributionID() { + return distributionID; + } + + public void setDistributionID(String distributionId) { + this.distributionID = distributionId; + } + + public String getConsumerID() { + return consumerID; + } + + public void setConsumerID(String consumerId) { + this.consumerID = consumerId; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public String getArtifactURL() { + return artifactURL; + } + + public void setArtifactURL(String artifactURL) { + this.artifactURL = artifactURL; + } + + public DistributionStatusNotificationEnum getStatus() { + return status; + } + + public void setStatus(DistributionStatusNotificationEnum status) { + this.status = status; + } + + public String getErrorReason() { + return errorReason; + } + + public void setErrorReason(String errorReason) { + this.errorReason = errorReason; + } + + public boolean isDistributionCompleteNotification() { + return DistributionStatusNotificationEnum.DISTRIBUTION_COMPLETE_OK.equals(status) || DistributionStatusNotificationEnum.DISTRIBUTION_COMPLETE_ERROR.equals(status); + } + + @Override + public String toString() { + return "DistributionStatusNotification [distributionId=" + distributionID + ", consumerId=" + consumerID + ", timestamp=" + timestamp + ", artifactURL=" + artifactURL + ", status=" + status + ", errorReason=" + errorReason + "]"; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotificationEnum.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotificationEnum.java index bd77f3915a..65aa18ee97 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotificationEnum.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionStatusNotificationEnum.java @@ -22,5 +22,5 @@ package org.openecomp.sdc.be.components.distribution.engine; public enum DistributionStatusNotificationEnum { - DOWNLOAD_OK, DOWNLOAD_ERROR, ALREADY_DOWNLOADED, DEPLOY_OK, DEPLOY_ERROR, ALREADY_DEPLOYED, NOTIFIED, NOT_NOTIFIED + DOWNLOAD_OK, DOWNLOAD_ERROR, ALREADY_DOWNLOADED, DEPLOY_OK, DEPLOY_ERROR, ALREADY_DEPLOYED, NOTIFIED, NOT_NOTIFIED, COMPONENT_DONE_ERROR, COMPONENT_DONE_OK, DISTRIBUTION_COMPLETE_OK, DISTRIBUTION_COMPLETE_ERROR } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java new file mode 100644 index 0000000000..3d35a84d7a --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java @@ -0,0 +1,91 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import fj.data.Either; +import org.openecomp.sdc.be.config.DmaapConsumerConfiguration; +import org.openecomp.sdc.security.SecurityUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.io.File; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.Properties; + +/** + * Allows to create DMAAP client of type MRConsumer according received configuration parameters + */ +@Component("dmaapClientFactory") +public class DmaapClientFactory { + private static final Logger logger = LoggerFactory.getLogger(DmaapClientFactory.class); + + /** + * Creates DMAAP consumer according to received parameters + * @param parameters + * @return an instance object of type MRConsumer + * @throws IOException + */ + public MRConsumer create(DmaapConsumerConfiguration parameters) throws Exception { + MRConsumer consumer = MRClientFactory.createConsumer(buildProperties(parameters)); + logger.info("MRConsumer created for topic {}", parameters.getTopic()); + return consumer; + } + + private Properties buildProperties(DmaapConsumerConfiguration parameters) throws Exception{ + Properties props = new Properties(); + Either<String,String> passkey = SecurityUtil.INSTANCE.decrypt(parameters.getCredential().getPassword() ); + if (passkey.isRight()){ + throw new GeneralSecurityException("invalid password, cannot build properties"); + } + props.setProperty("Latitude", Double.toString(parameters.getLatitude())); + props.setProperty("Longitude", Double.toString(parameters.getLongitude())); + props.setProperty("Version", parameters.getVersion()); + props.setProperty("ServiceName", parameters.getServiceName()); + props.setProperty("Environment", parameters.getEnvironment()); + props.setProperty("Partner", parameters.getPartner()); + props.setProperty("routeOffer", parameters.getRouteOffer()); + props.setProperty("Protocol", parameters.getProtocol()); + props.setProperty("username", parameters.getCredential().getUsername()); + props.setProperty("password", passkey.left().value() ); + props.setProperty("contenttype", parameters.getContenttype()); + props.setProperty("host", parameters.getHosts()); + props.setProperty("topic", parameters.getTopic()); + props.setProperty("group", parameters.getConsumerGroup()); + props.setProperty("id", parameters.getConsumerId()); + props.setProperty("timeout", Integer.toString(parameters.getTimeoutMs())); + props.setProperty("limit", Integer.toString(parameters.getLimit())); + props.setProperty("AFT_DME2_REQ_TRACE_ON", Boolean.toString(parameters.isDme2TraceOn())); + props.setProperty("AFT_ENVIRONMENT", parameters.getAftEnvironment()); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", Integer.toString(parameters.getAftDme2ConnectionTimeoutMs())); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", Integer.toString(parameters.getAftDme2RoundtripTimeoutMs())); + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", Integer.toString(parameters.getAftDme2ReadTimeoutMs())); + + String dme2PreferredRouterFilePath = parameters.getDme2preferredRouterFilePath(); + ensureFileExists(dme2PreferredRouterFilePath); + props.setProperty("DME2preferredRouterFilePath", dme2PreferredRouterFilePath); + + props.setProperty("TransportType", "DME2"); + props.setProperty("SubContextPath", "/"); + props.setProperty("MethodType", "GET"); + props.setProperty("authKey", ""); + props.setProperty("authDate", ""); + props.setProperty("filter", ""); + props.setProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", ""); + props.setProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS", ""); + props.setProperty("sessionstickinessrequired", "no"); + + return props; + } + + private void ensureFileExists(String filePath) throws IOException { + File file = new File(filePath); + if(file.createNewFile()) { + logger.info("The file {} has been created on the disk", file.getAbsolutePath()); + } + else{ + logger.info("The file {} already exists", file.getAbsolutePath()); + } + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java new file mode 100644 index 0000000000..e0661f4b22 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapConsumer.java @@ -0,0 +1,76 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +import com.att.nsa.mr.client.MRConsumer; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.config.DmaapConsumerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * Allows consuming DMAAP topic according to received consumer parameters + * Allows processing received messages. + */ +@Service +public class DmaapConsumer { + private final ExecutorFactory executorFactory; + private final DmaapClientFactory dmaapClientFactory; + private static final Logger logger = LoggerFactory.getLogger(DmaapClientFactory.class); + + @Autowired + private DmaapHealth dmaapHealth; + /** + * Allows to create an object of type DmaapConsumer + * @param executorFactory + * @param dmaapClientFactory + */ + @Autowired + public DmaapConsumer(ExecutorFactory executorFactory, DmaapClientFactory dmaapClientFactory) { + this.executorFactory = executorFactory; + this.dmaapClientFactory = dmaapClientFactory; + } + + /** + * Allows consuming DMAAP topic according to received consumer parameters + * @param notificationReceived + * @param exceptionHandler + * @throws Exception + */ + public void consumeDmaapTopic(Consumer<String> notificationReceived, UncaughtExceptionHandler exceptionHandler) throws Exception { + + DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration(); + String topic = dmaapConsumerParams.getTopic(); + logger.info("Starting to consume topic {} for DMAAP consumer with the next parameters {}. ", topic, dmaapConsumerParams); + MRConsumer consumer = dmaapClientFactory.create(dmaapConsumerParams); + ScheduledExecutorService pollExecutor = executorFactory.createScheduled(topic + "Client"); + ExecutorService notificationExecutor = executorFactory.create(topic + "Consumer", exceptionHandler); + + pollExecutor.scheduleWithFixedDelay(() -> { + logger.info("Trying to fetch messages from topic: {}", topic); + boolean isTopicAvailable = false; + try { + Iterable<String> messages = consumer.fetch(); + isTopicAvailable = true ; + if (messages != null) { + for (String msg : messages) { + logger.info("The DMAAP message {} received. The topic is {}.", msg, topic); + notificationExecutor.execute(() -> notificationReceived.accept(msg)); + } + } + //successfully fetched + } + catch (Exception e) { + logger.error("The exception {} occured upon fetching DMAAP message", e); + } + dmaapHealth.report( isTopicAvailable ); + }, 0L, dmaapConsumerParams.getPollingInterval(), TimeUnit.SECONDS); + } + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapHealth.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapHealth.java new file mode 100644 index 0000000000..ba5e9f7c9c --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapHealth.java @@ -0,0 +1,220 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +import org.apache.commons.validator.routines.UrlValidator; +import org.apache.http.client.utils.URIUtils; +import org.openecomp.sdc.be.config.BeEcompErrorManager; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.config.DmaapConsumerConfiguration; +import org.openecomp.sdc.common.api.HealthCheckInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.io.IOException; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.commons.lang3.StringUtils.*; +import static org.openecomp.sdc.common.api.Constants.HC_COMPONENT_DMAAP_ENGINE; + +@Component("dmaapHealth") +public class DmaapHealth { + + + protected static final String DMAAP_HEALTH_LOG_CONTEXT = "dmaap.healthcheck"; + private static final String DMAAP_HEALTH_CHECK_STR = "dmaapHealthCheck"; + private static final Logger log = LoggerFactory.getLogger(DmaapHealth.class); + private static final Logger logHealth = LoggerFactory.getLogger(DMAAP_HEALTH_LOG_CONTEXT); + private HealthCheckInfo healthCheckInfo = DmaapHealth.HealthCheckInfoResult.UNAVAILABLE.getHealthCheckInfo(); + private long healthCheckReadTimeout = 20; + private long reconnectInterval = 5; + private HealthCheckScheduledTask healthCheckScheduledTask = null ; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private ScheduledFuture<?> scheduledFuture = null; + private DmaapConsumerConfiguration configuration = null ; + + private volatile AtomicBoolean lastHealthState = new AtomicBoolean(false); + private volatile AtomicBoolean reportedHealthState = null; + + public enum HealthCheckInfoResult { + OK(new HealthCheckInfo(HC_COMPONENT_DMAAP_ENGINE, HealthCheckInfo.HealthCheckStatus.UP, null, DmaapStatusDescription.OK.getDescription())), + UNAVAILABLE(new HealthCheckInfo(HC_COMPONENT_DMAAP_ENGINE, HealthCheckInfo.HealthCheckStatus.DOWN, null, DmaapStatusDescription.UNAVAILABLE.getDescription())), + DOWN(new HealthCheckInfo(HC_COMPONENT_DMAAP_ENGINE, HealthCheckInfo.HealthCheckStatus.DOWN, null, DmaapStatusDescription.DOWN.getDescription())); + + private HealthCheckInfo healthCheckInfo; + HealthCheckInfoResult(HealthCheckInfo healthCheckInfo) { + this.healthCheckInfo = healthCheckInfo; + } + public HealthCheckInfo getHealthCheckInfo() { + return healthCheckInfo; + } + } + + public enum DmaapStatusDescription { + OK("OK"), UNAVAILABLE("Dmaap is not available"),DOWN("DOWN"), NOT_CONFIGURED("Dmaap configuration is missing/wrong "); + + private String desc; + DmaapStatusDescription(String desc) { + this.desc = desc; + } + public String getDescription() { + return desc; + } + + } + + @PostConstruct + public DmaapHealth init() { + log.trace("Enter init method of Dmaap health"); + synchronized (DmaapHealth.class){ + this.configuration = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration(); + + Integer pollingInterval = configuration.getPollingInterval(); + if (pollingInterval != null && pollingInterval!=0) { + reconnectInterval = pollingInterval; + } + Integer healthCheckReadTimeoutConfig = configuration.getTimeoutMs(); + if (healthCheckReadTimeoutConfig != null) { + this.healthCheckReadTimeout = healthCheckReadTimeoutConfig; + } + this.healthCheckScheduledTask = new HealthCheckScheduledTask( configuration ); //what is the representation? csv? delimiter? json or other + startHealthCheckTask(true); + } + log.trace("Exit init method of DistributionEngineClusterHealth"); + return this; + } + + @PreDestroy + protected void destroy() { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + scheduledFuture = null; + } + if (scheduler != null) { + scheduler.shutdown(); + } + } + + /** + * Start health check task. + * + * @param startTask + */ + public void startHealthCheckTask( boolean startTask ) { + synchronized (DmaapHealth.class){ + if (startTask && this.scheduledFuture == null) { + this.scheduledFuture = this.scheduler.scheduleAtFixedRate(this.healthCheckScheduledTask , 0, reconnectInterval, TimeUnit.SECONDS); + } + } + } + + public void report(Boolean isUp){ + if (reportedHealthState == null) + reportedHealthState = new AtomicBoolean(isUp); + reportedHealthState.set(isUp); + } + + public void logAlarm(boolean lastHealthState) { + try{ + if ( lastHealthState ) { + BeEcompErrorManager.getInstance().logDmaapHealthCheckRecovery( DMAAP_HEALTH_CHECK_STR ); + } else { + BeEcompErrorManager.getInstance().logDmaapHealthCheckError( DMAAP_HEALTH_CHECK_STR ); + } + }catch( Exception e ){ + log.debug("cannot logAlarm -> {}" ,e ); + } + } + + public DmaapConsumerConfiguration getConfiguration() { + return configuration; + } + + public HealthCheckInfo getHealthCheckInfo() { + return healthCheckInfo; + } + + /** + * Health Check Task Scheduler - infinite check. + */ + public class HealthCheckScheduledTask implements Runnable { + private final DmaapConsumerConfiguration config; + private static final int timeout = 8192; + + public HealthCheckScheduledTask(final DmaapConsumerConfiguration config){ + this.config = config; + } + @Override + public void run() { + logHealth.trace("Executing Dmaap Health Check Task - Start"); + boolean prevIsReachable = false; + boolean reachable = false; + //first try simple ping + try{ + if ( reportedHealthState != null ){ + reachable = reportedHealthState.get(); + } + else{ + reachable = false; + } + prevIsReachable = lastHealthState.getAndSet( reachable ); + healthCheckInfo = reachable ? HealthCheckInfoResult.OK.healthCheckInfo : HealthCheckInfoResult.DOWN.healthCheckInfo; + } + catch( Exception e ){ + log.debug("{} | cannot check connectivity -> {}",DMAAP_HEALTH_CHECK_STR, e ); + prevIsReachable = lastHealthState.getAndSet(false); + healthCheckInfo = HealthCheckInfoResult.UNAVAILABLE.healthCheckInfo; + } + if (prevIsReachable != lastHealthState.get()) + logAlarm( lastHealthState.get() ); + } + + + /** + * @deprecated (health is reported outside from EnvironmentEngine consumer fetch) + */ + @Deprecated + public boolean isICMPReachable( ) throws IOException{ + try{ + String hostname = getUrlHost(config.getHosts()); + return InetAddress.getByName( hostname ).isReachable(timeout); + }catch( URISyntaxException e ){ + log.debug("{} | malformed host configuration -> ",DMAAP_HEALTH_CHECK_STR , e); + } + return false; + } + } + + public static String getUrlHost(String qualifiedHost) throws URISyntaxException{ + //region - parse complex format ex. <http://URL:PORT> + try{ + UrlValidator validator = new UrlValidator(); + if (validator.isValid(qualifiedHost)){ + return URIUtils.extractHost(new URI(qualifiedHost)).getHostName(); + }else{ + log.debug("{} | invalid url format, continuing ", DMAAP_HEALTH_CHECK_STR ); + } + }catch(URISyntaxException e){ + log.debug("{} | invalid url format, continuing {} ", DMAAP_HEALTH_CHECK_STR , e); + } + //endregion + + //region - try shortcut format <URL> or <URL:PORT> + if ( countMatches( qualifiedHost , ":") <= 1){ + String[] address = qualifiedHost.split(":"); + if ( address.length>0 && isNotBlank(address[0]) ){ + return address[0]; + } + } + //endregion + throw new URISyntaxException( qualifiedHost , "invalid hostname, expecting a single <host:port> , (valid ex. www.google.com:80 | www.google.com | http:\\\\www.google.com:8181)"); + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapNotificationDataImpl.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapNotificationDataImpl.java new file mode 100644 index 0000000000..3f86fe73de --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapNotificationDataImpl.java @@ -0,0 +1,65 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.components.distribution.engine; + +/* Example { + "operationalEnvironmentId": "28122015552391", + "operationalEnvironmentName": "Operational Environment Name", + "operationalEnvironmentType": "ECOMP", + "tenantContext": "TEST", + "workloadContext": "ECOMP_E2E-IST", + "action": "Create" + }*/ +public class DmaapNotificationDataImpl implements IDmaapNotificationData, IDmaapAuditNotificationData { + + private String operationalEnvironmentId; + private String operationalEnvironmentType; + private String action; + private String operationalEnvironmentName; + private String tenantContext; + + @Override + public String getOperationalEnvironmentId() { + return operationalEnvironmentId; + } + + @Override + public OperationaEnvironmentTypeEnum getOperationalEnvironmentType() { + return OperationaEnvironmentTypeEnum.findByName(operationalEnvironmentType); + } + + @Override + public DmaapActionEnum getAction() { + return DmaapActionEnum.findByName(action); + } + + @Override + public String getOperationalEnvironmentName() { + return operationalEnvironmentName; + } + + @Override + public String getTenantContext() { + return tenantContext; + } + + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentMessageBusData.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentMessageBusData.java new file mode 100644 index 0000000000..67977b6361 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentMessageBusData.java @@ -0,0 +1,75 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; + +import java.util.ArrayList; +import java.util.List; + +/** + * a pojo which holds all the necessary data to communicate with the message bus + * this class is a reflection ot the {@link OperationalEnvironmentEntry} class + */ +public class EnvironmentMessageBusData { + + private List<String> dmaaPuebEndpoints; + + private String uebPublicKey; + + private String uebPrivateKey; + + private String envId; + + private String tenant; + + public EnvironmentMessageBusData() { + } + + public EnvironmentMessageBusData(OperationalEnvironmentEntry operationalEnvironment) { + this.dmaaPuebEndpoints = new ArrayList<>(operationalEnvironment.getDmaapUebAddress()); + this.uebPublicKey = operationalEnvironment.getUebApikey(); + this.uebPrivateKey = operationalEnvironment.getUebSecretKey(); + this.envId = operationalEnvironment.getEnvironmentId(); + this.tenant = operationalEnvironment.getTenant(); + } + + public String getTenant() { + return tenant; + } + + public void setTenant(String tenant) { + this.tenant = tenant; + } + + public List<String> getDmaaPuebEndpoints() { + return dmaaPuebEndpoints; + } + + public void setDmaaPuebEndpoints(List<String> dmaaPuebEndpoints) { + this.dmaaPuebEndpoints = dmaaPuebEndpoints; + } + + public String getUebPublicKey() { + return uebPublicKey; + } + + public void setUebPublicKey(String uebPublicKey) { + this.uebPublicKey = uebPublicKey; + } + + public String getUebPrivateKey() { + return uebPrivateKey; + } + + public void setUebPrivateKey(String uebPrivateKey) { + this.uebPrivateKey = uebPrivateKey; + } + + public String getEnvId() { + return envId; + } + + public void setEnvId(String envId) { + this.envId = envId; + } + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentsEngine.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentsEngine.java new file mode 100644 index 0000000000..822464c631 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/EnvironmentsEngine.java @@ -0,0 +1,526 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +import com.att.aft.dme2.api.DME2Exception; +import com.att.aft.dme2.iterator.DME2EndpointIterator; +import com.att.aft.dme2.iterator.domain.DME2EndpointReference; +import com.att.aft.dme2.manager.registry.DME2Endpoint; +import com.att.nsa.apiClient.credentials.ApiCredential; +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import fj.data.Either; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpStatus; +import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.DmaapActionEnum; +import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.OperationaEnvironmentTypeEnum; +import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.openecomp.sdc.be.config.DmaapConsumerConfiguration; +import org.openecomp.sdc.be.config.DmeConfiguration; +import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus; +import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao; +import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum; +import org.openecomp.sdc.be.impl.ComponentsUtils; +import org.openecomp.sdc.be.info.OperationalEnvInfo; +import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; +import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum; +import org.openecomp.sdc.common.datastructure.Wrapper; +import org.openecomp.sdc.common.http.client.api.HttpResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut; + +/** + * Allows to consume DMAAP topic and handle received notifications + */ +@Service +public class EnvironmentsEngine implements INotificationHandler { + + private static final String MESSAGE_BUS = "MessageBus"; + private static final String UNKNOWN = "Unknown"; + private static final Logger log = LoggerFactory.getLogger(EnvironmentsEngine.class); + private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager(); + + private Map<String, OperationalEnvironmentEntry> environments; + private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>(); + private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>(); + private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>(); + + private final DmaapConsumer dmaapConsumer; + private final OperationalEnvironmentDao operationalEnvironmentDao; + private final DME2EndpointIteratorCreator epIterCreator; + private final AaiRequestHandler aaiRequestHandler; + private final ComponentsUtils componentUtils; + private final CambriaHandler cambriaHandler; + private final DistributionEngineClusterHealth distributionEngineClusterHealth; + private final DistributionCompleteReporter distributionCompleteReporter; + + public EnvironmentsEngine(DmaapConsumer dmaapConsumer, OperationalEnvironmentDao operationalEnvironmentDao, DME2EndpointIteratorCreator epIterCreator, AaiRequestHandler aaiRequestHandler, ComponentsUtils componentUtils, CambriaHandler cambriaHandler, DistributionEngineClusterHealth distributionEngineClusterHealth, DistributionCompleteReporter distributionCompleteReporter) { + this.dmaapConsumer = dmaapConsumer; + this.operationalEnvironmentDao = operationalEnvironmentDao; + this.epIterCreator = epIterCreator; + this.aaiRequestHandler = aaiRequestHandler; + this.componentUtils = componentUtils; + this.cambriaHandler = cambriaHandler; + this.distributionEngineClusterHealth = distributionEngineClusterHealth; + this.distributionCompleteReporter = distributionCompleteReporter; + } + + @VisibleForTesting + @PostConstruct + void init() { + log.trace("Environments engine has been initialized. "); + try { + environments = populateEnvironments(); + createUebTopicsForEnvironments(); + dmaapConsumer.consumeDmaapTopic(this::handleMessage, + (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e)); + } + catch (Exception e) { + log.error("An error occurred upon consuming topic by Dmaap consumer client." , e); + } + } + public void connectUebTopicTenantIsolation(OperationalEnvironmentEntry opEnvEntry, + AtomicBoolean status, + Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask){ + connectUebTopic(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask); + + } + + public void connectUebTopicForDistributionConfTopic(String envName, + AtomicBoolean status, + Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask){ + connectUebTopic(environments.get(envName), status, envNamePerInitTask, envNamePerPollingTask); + + } + /** + * Allows to create and run UEB initializing and polling tasks + * @param status + * @param envNamePerInitTask + * @param envNamePerPollingTask + * @param opEnvEntry + */ + private void connectUebTopic(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status, + Map<String, DistributionEngineInitTask> envNamePerInitTask, + Map<String, DistributionEnginePollingTask> envNamePerPollingTask) { + + String envId = opEnvEntry.getEnvironmentId(); + + DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager() + .getDistributionEngineConfiguration(); + DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask( + distributionEngineConfiguration, distributionCompleteReporter, componentUtils, distributionEngineClusterHealth, + opEnvEntry); + String envName = configurationManager.getDistributionEngineConfiguration().getEnvironments().get(0); + DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l, + distributionEngineConfiguration, envName, status, componentUtils, distributionEnginePollingTask, + opEnvEntry); + distributionEngineInitTask.startTask(); + envNamePerInitTask.put(envId, distributionEngineInitTask); + envNamePerPollingTask.put(envId, distributionEnginePollingTask); + + log.debug("Environment envId = {} has been connected to the UEB topic", envId); + } + + @Override + public boolean handleMessage(String notification) { + DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager() + .getConfiguration().getDmaapConsumerConfiguration(); + Supplier<Boolean> supplier = () -> handleMessageLogic(notification); + Either<Boolean, Boolean> eitherTimeOut = runMethodWithTimeOut(supplier, + dmaapConsumerParams.getTimeLimitForNotificationHandleMs()); + + boolean result; + if (eitherTimeOut.isRight()) { + result = false; + } else { + result = eitherTimeOut.left().value(); + } + return result; + } + + public boolean handleMessageLogic(String notification) { + Wrapper<Boolean> errorWrapper = new Wrapper<>(); + Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper = new Wrapper<>(); + try { + + log.debug("handle message - for operational environment notification received: {}", notification); + Gson gsonObj = new GsonBuilder().create(); + + IDmaapNotificationData notificationData = gsonObj.fromJson(notification, + DmaapNotificationDataImpl.class); + IDmaapAuditNotificationData auditNotificationData = gsonObj.fromJson(notification, + DmaapNotificationDataImpl.class); + + AuditingActionEnum actionEnum; + switch(notificationData.getAction()) { + case CREATE: + actionEnum = AuditingActionEnum.CREATE_ENVIRONMENT; + break; + case UPDATE: + actionEnum = AuditingActionEnum.UPDATE_ENVIRONMENT; + break; + case DELETE: + actionEnum = AuditingActionEnum.DELETE_ENVIRONMENT; + break; + default: + actionEnum = AuditingActionEnum.UNKNOWN_ENVIRONMENT_NOTIFICATION; + break; + } + componentUtils.auditEnvironmentEngine(actionEnum, + notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(), + notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(), + auditNotificationData.getTenantContext()); + + if (errorWrapper.isEmpty()) { + validateNotification(errorWrapper, notificationData, auditNotificationData); + } + // Perform Save In-Progress Dao + if (errorWrapper.isEmpty()) { + saveEntryWithInProgressStatus(errorWrapper, opEnvEntryWrapper, notificationData); + } + + if (errorWrapper.isEmpty()) { + buildOpEnv(errorWrapper, opEnvEntryWrapper.getInnerElement()); + } + + } catch (Exception e) { + log.debug("handle message for operational environmet failed for notification: {} with error :{}", + notification, e.getMessage(), e); + errorWrapper.setInnerElement(false); + + } + return errorWrapper.isEmpty(); + } + + private void validateNotification(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData, + IDmaapAuditNotificationData auditNotificationData) { + // Check OperationaEnvironmentType + if (errorWrapper.isEmpty()) { + validateEnvironmentType(errorWrapper, notificationData, auditNotificationData); + } + // Check Action Type + if (errorWrapper.isEmpty()) { + validateActionType(errorWrapper, notificationData); + } + // Check is valid for create/update (not In-Progress state) + if (errorWrapper.isEmpty()) { + validateState(errorWrapper, notificationData); + } + } + + public void buildOpEnv(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) { + // Get Env Info From A&AI + if (errorWrapper.isEmpty()) { + retrieveOpEnvInfoFromAAI(errorWrapper, opEnvEntry); + } + + if (errorWrapper.isEmpty()) { + // Get List Of UEB Addresses From AFT_DME + retrieveUebAddressesFromAftDme(errorWrapper, opEnvEntry); + } + + // Create UEB keys and set them on EnvEntry + if (errorWrapper.isEmpty()) { + createUebKeys(errorWrapper, opEnvEntry); + } + + // Create Topics + if (errorWrapper.isEmpty()) { + log.debug("handle message - Create Topics"); + createUebTopicsForEnvironment(opEnvEntry); + } + + // Save Status Complete and Add to Map + if (errorWrapper.isEmpty()) { + saveEntryWithCompleteStatus(errorWrapper, opEnvEntry); + } + + // Update Environments Map + if (errorWrapper.isEmpty()) { + environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry); + } + else{ + saveEntryWithFailedStatus(errorWrapper, opEnvEntry); + } + } + + private void saveEntryWithFailedStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) { + log.debug("handle message - save OperationalEnvironment Failed Status"); + opEnvEntry.setStatus(EnvironmentStatusEnum.FAILED); + saveOpEnvEntry(errorWrapper, opEnvEntry); + } + + void saveEntryWithCompleteStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) { + log.debug("handle message - save OperationalEnvironment Complete Dao"); + opEnvEntry.setStatus(EnvironmentStatusEnum.COMPLETED); + saveOpEnvEntry(errorWrapper, opEnvEntry); + + } + + void retrieveUebAddressesFromAftDme(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) { + log.debug("handle message - Get List Of UEB Addresses From AFT_DME"); + try { + boolean isKeyFieldsValid = !isEmpty(opEnvEntry.getTenant()) && !isEmpty(opEnvEntry.getEcompWorkloadContext()); + if( isKeyFieldsValid ){ + String opEnvKey = map2OpEnvKey(opEnvEntry); + String environmentId = opEnvEntry.getEnvironmentId(); + List<String> uebHosts = discoverUebHosts(opEnvKey, environmentId); + opEnvEntry.setDmaapUebAddress(uebHosts.stream().collect(Collectors.toSet())); + } + else{ + errorWrapper.setInnerElement(false); + log.debug("Can Not Build AFT DME Key from workLoad & Tenant Fields."); + } + + } catch (DME2Exception e) { + errorWrapper.setInnerElement(false); + log.error("Failed to retrieve Ueb Addresses From DME. ", e); + } + } + + void createUebKeys(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) { + log.debug("handle message - Create UEB keys"); + List<String> discoverEndPoints = opEnvEntry.getDmaapUebAddress().stream() + .collect(Collectors.toList()); + Either<ApiCredential, CambriaErrorResponse> eitherCreateUebKeys = cambriaHandler + .createUebKeys(discoverEndPoints); + if (eitherCreateUebKeys.isRight()) { + errorWrapper.setInnerElement(false); + log.debug("handle message - failed to create UEB Keys"); + } else { + ApiCredential apiCredential = eitherCreateUebKeys.left().value(); + opEnvEntry.setUebApikey(apiCredential.getApiKey()); + opEnvEntry.setUebSecretKey(apiCredential.getApiSecret()); + } + } + + void retrieveOpEnvInfoFromAAI(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) { + log.debug("handle message - Get Env Info From A&AI"); + Either<OperationalEnvInfo, Integer> eitherOperationalEnvInfo = getOperationalEnvById( + opEnvEntry.getEnvironmentId()); + if (eitherOperationalEnvInfo.isRight()) { + errorWrapper.setInnerElement(false); + log.debug("handle message - failed to retrieve details from A&AI"); + } else { + OperationalEnvInfo operationalEnvInfo = eitherOperationalEnvInfo.left().value(); + opEnvEntry.setEcompWorkloadContext(operationalEnvInfo.getWorkloadContext()); + opEnvEntry.setTenant(operationalEnvInfo.getTenantContext()); + } + } + + void saveEntryWithInProgressStatus(Wrapper<Boolean> errorWrapper, Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper, IDmaapNotificationData notificationData) { + log.debug("handle message - save OperationalEnvironment In-Progress Dao"); + OperationalEnvironmentEntry opEnvEntry = new OperationalEnvironmentEntry(); + // Entry Environment ID holds actually the environment NAME + opEnvEntry.setEnvironmentId(notificationData.getOperationalEnvironmentId()); + opEnvEntry.setStatus(EnvironmentStatusEnum.IN_PROGRESS); + opEnvEntry.setIsProduction(false); + saveOpEnvEntry(errorWrapper, opEnvEntry); + opEnvEntryWrapper.setInnerElement(opEnvEntry); + + } + + + void validateState(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) { + log.debug("handle message - verify OperationalEnvironment not In-Progress"); + String opEnvId = notificationData.getOperationalEnvironmentId(); + + Either<OperationalEnvironmentEntry, CassandraOperationStatus> eitherOpEnv = operationalEnvironmentDao + .get(opEnvId); + if (eitherOpEnv.isLeft()) { + final OperationalEnvironmentEntry opEnvEntry = eitherOpEnv.left().value(); + if (StringUtils.equals(opEnvEntry.getStatus(), EnvironmentStatusEnum.IN_PROGRESS.getName())) { + errorWrapper.setInnerElement(false); + log.debug("handle message - validate State Failed Record Found With Status : {} Flow Stopped!", opEnvEntry.getStatus()); + } + } else { + CassandraOperationStatus operationStatus = eitherOpEnv.right().value(); + if (operationStatus != CassandraOperationStatus.NOT_FOUND) { + errorWrapper.setInnerElement(false); + log.debug("failed to retrieve operationa environment with id:{} cassandra error was :{}", opEnvId, + operationStatus); + } + } + + } + + void validateActionType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) { + log.debug("handle message - verify Action Type"); + DmaapActionEnum action = notificationData.getAction(); + if (action == DmaapActionEnum.DELETE) { + errorWrapper.setInnerElement(false); + log.debug("handle message - validate Action Type Failed With Action Type: {} Flow Stopped!", action); + } + } + + void validateEnvironmentType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData, + IDmaapAuditNotificationData auditNotificationData) { + log.debug("handle message - verify OperationaEnvironmentType"); + OperationaEnvironmentTypeEnum envType = notificationData.getOperationalEnvironmentType(); + if (envType != OperationaEnvironmentTypeEnum.ECOMP) { + errorWrapper.setInnerElement(false); + log.debug("handle message - validate Environment Type Failed With Environment Type: {} Flow Stopped!", envType); + componentUtils.auditEnvironmentEngine(AuditingActionEnum.UNSUPPORTED_ENVIRONMENT_TYPE, + notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(), + notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(), + auditNotificationData.getTenantContext()); + } + } + + + private void saveOpEnvEntry(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry entry) { + entry.setLastModified(new Date(System.currentTimeMillis())); + CassandraOperationStatus saveStaus = operationalEnvironmentDao.save(entry); + if (saveStaus != CassandraOperationStatus.OK) { + errorWrapper.setInnerElement(false); + log.debug("handle message saving operational environmet failed for id :{} with error : {}", + entry.getEnvironmentId(), saveStaus); + } + } + + public List<String> discoverUebHosts(String opEnvKey, String env) throws DME2Exception { + DmeConfiguration dmeConfiguration = configurationManager.getConfiguration().getDmeConfiguration(); + List<String> uebHosts = new LinkedList<>(); + + String lookupURI = String.format("http://%s/service=%s/version=1.0.0/envContext=%s/partner=*", dmeConfiguration.getDme2Search(), opEnvKey, + env); + DME2EndpointIterator iterator = epIterCreator.create(lookupURI); + + // Beginning iteration + while (iterator.hasNext()) { + DME2EndpointReference ref = iterator.next(); + DME2Endpoint dmeEndpoint = ref.getEndpoint(); + log.debug("DME returns EP with UEB host {}, UEB port: {}", dmeEndpoint.getHost(), dmeEndpoint.getPort()); + uebHosts.add(dmeEndpoint.getHost()); + } + + return uebHosts; + } + + private String map2OpEnvKey(OperationalEnvironmentEntry entry) { + return String.format("%s.%s.%s", entry.getTenant(), entry.getEcompWorkloadContext(), MESSAGE_BUS); + } + + private Map<String, OperationalEnvironmentEntry> populateEnvironments() { + Map<String, OperationalEnvironmentEntry> envs = getEnvironmentsFromDb(); + OperationalEnvironmentEntry confEntry = readEnvFromConfig(); + envs.put(confEntry.getEnvironmentId(), confEntry); + return envs; + } + + private OperationalEnvironmentEntry readEnvFromConfig() { + OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry(); + DistributionEngineConfiguration distributionEngineConfiguration = configurationManager + .getDistributionEngineConfiguration(); + entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey()); + entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey()); + + Set<String> puebEndpoints = new HashSet<>(); + puebEndpoints.addAll(distributionEngineConfiguration.getUebServers()); + entry.setDmaapUebAddress(puebEndpoints); + + String envName = distributionEngineConfiguration.getEnvironments().size() == 1 + ? distributionEngineConfiguration.getEnvironments().get(0) : UNKNOWN; + entry.setEnvironmentId(envName); + + if(log.isDebugEnabled()) { + log.debug("Enviroment read from configuration: {}", entry.toString()); + } + + return entry; + } + + private Map<String, OperationalEnvironmentEntry> getEnvironmentsFromDb() { + Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> opEnvResult = operationalEnvironmentDao + .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED); + + if (opEnvResult.isLeft()) { + Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream() + .collect(Collectors.toMap(OperationalEnvironmentEntry::getEnvironmentId, Function.identity())); + resultMap.forEach( (key, value) -> log.debug("Enviroment loaded from DB: {}", value.toString()) ); + return resultMap; + } else { + CassandraOperationStatus status = opEnvResult.right().value(); + log.debug("Failed to populate Operation Envirenments Map from Cassandra, DB status: {}", status); + return new HashMap<>(); + } + } + + void createUebTopicsForEnvironments() { + environments.values().forEach(this::createUebTopicsForEnvironment); + } + + public void createUebTopicsForEnvironment(OperationalEnvironmentEntry opEnvEntry) { + String envId = opEnvEntry.getEnvironmentId(); + log.debug("Create Environment {} on UEB Topic.", envId); + AtomicBoolean status = new AtomicBoolean(false); + envNamePerStatus.put(envId, status); + + connectUebTopicTenantIsolation(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask); + } + + @VisibleForTesting + void setConfigurationManager(ConfigurationManager configurationManager) { + this.configurationManager = configurationManager; + } + + public Map<String, OperationalEnvironmentEntry> getEnvironments() { + return environments; + } + + + public Either<OperationalEnvInfo, Integer> getOperationalEnvById(String id) { + HttpResponse<String> resp = aaiRequestHandler.getOperationalEnvById(id); + if (resp.getStatusCode() == HttpStatus.SC_OK) { + try { + OperationalEnvInfo operationalEnvInfo = OperationalEnvInfo.createFromJson(resp.getResponse()); + + log.debug("Get \"{}\" operational environment. {}", id, operationalEnvInfo); + return Either.left(operationalEnvInfo); + } catch (Exception e) { + log.debug("Json convert to OperationalEnvInfo failed with exception ", e); + return Either.right(HttpStatus.SC_INTERNAL_SERVER_ERROR); + } + } else { + log.debug("Get \"{}\" operational environment failed with statusCode: {}, description: {}", id, + resp.getStatusCode(), resp.getDescription()); + return Either.right(resp.getStatusCode()); + } + } + + public OperationalEnvironmentEntry getEnvironmentById (String envId) { + return environments.get(envId); + } + + public boolean isInMap(OperationalEnvironmentEntry env) { + return isInMap(env.getEnvironmentId()); + } + + public boolean isInMap(String envId) { + return environments.containsKey(envId); + } + + public void addToMap(OperationalEnvironmentEntry opEnvEntry) { + environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry); + + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ExecutorFactory.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ExecutorFactory.java new file mode 100644 index 0000000000..29579c64b7 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ExecutorFactory.java @@ -0,0 +1,43 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +@Component("executorFactory") +/** + * Allows to create next kinds of single thread executors: SingleThreadExecutor and SingleThreadScheduledExecutor + */ +public class ExecutorFactory { + + private static final Logger logger = LoggerFactory.getLogger(EnvironmentsEngine.class); + + public ExecutorService create(String name, UncaughtExceptionHandler exceptionHandler){ + logger.info("Going to create single thread executor. "); + ThreadFactory threadFactory = createThreadFactory(name, exceptionHandler); + return Executors.newSingleThreadExecutor(threadFactory); + } + + public ScheduledExecutorService createScheduled(String name){ + logger.info("Going to create single thread scheduled executor. "); + ThreadFactory threadFactory = createThreadFactory(name, + (t, e) -> LoggerFactory.getLogger(UncaughtExceptionHandler.class).error("An error occurred: ", e)); + return Executors.newSingleThreadScheduledExecutor(threadFactory); + } + + private ThreadFactory createThreadFactory(String name, UncaughtExceptionHandler exceptionHandler) { + String nameFormat = name + "-%d"; + return new ThreadFactoryBuilder() + .setThreadFactory(Executors.defaultThreadFactory()) + .setNameFormat(nameFormat) + .setUncaughtExceptionHandler(exceptionHandler) + .setDaemon(true) + .build(); + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IArtifactInfo.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IArtifactInfo.java index 169f4f3efa..4a917710a9 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IArtifactInfo.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IArtifactInfo.java @@ -24,40 +24,40 @@ import org.openecomp.sdc.common.api.ArtifactTypeEnum; public interface IArtifactInfo { - /** Artifact File name */ - String getArtifactName(); - - /** - * Artifact Type.<br> - * Following are valid values : HEAT , DG_XML. <br> - * List of values will be extended in post-1510 releases. - */ - ArtifactTypeEnum getArtifactType(); - - /** - * Relative artifact's URL. Should be used in REST GET API to download the artifact's payload.<br> - * The full artifact URL will be in the following format :<br> - * https://{serverBaseURL}/{resourcePath}<br> - * serverBaseURL - Hostname ( ASDC LB FQDN) + optional port <br> - * resourcePath - "artifactURL" <br> - * Ex : https://asdc.sdc.com/v1/catalog/services/srv1/2.0/resources/aaa/1.0/artifacts/aaa.yml - */ - String getArtifactURL(); - - /** - * Base-64 encoded MD5 checksum of the artifact's payload.<br> - * Should be used for data integrity validation when an artifact's payload is downloaded.<br> - */ - String getArtifactChecksum(); - - /** - * Installation timeout. Used by the orchestrator. - */ - Integer getArtifactTimeout(); - - /** - * Artifact description - */ - String getArtifactDescription(); + /** Artifact File name */ + String getArtifactName(); + + /** + * Artifact Type.<br> + * Following are valid values : HEAT , DG_XML. <br> + * List of values will be extended in post-1510 releases. + */ + ArtifactTypeEnum getArtifactType(); + + /** + * Relative artifact's URL. Should be used in REST GET API to download the artifact's payload.<br> + * The full artifact URL will be in the following format :<br> + * https://{serverBaseURL}/{resourcePath}<br> + * serverBaseURL - Hostname ( ASDC LB FQDN) + optional port <br> + * resourcePath - "artifactURL" <br> + * Ex : https://asdc.sdc.com/v1/catalog/services/srv1/2.0/resources/aaa/1.0/artifacts/aaa.yml + */ + String getArtifactURL(); + + /** + * Base-64 encoded MD5 checksum of the artifact's payload.<br> + * Should be used for data integrity validation when an artifact's payload is downloaded.<br> + */ + String getArtifactChecksum(); + + /** + * Installation timeout. Used by the orchestrator. + */ + Integer getArtifactTimeout(); + + /** + * Artifact description + */ + String getArtifactDescription(); } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java index a27156616b..96abfe087c 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,26 +20,35 @@ package org.openecomp.sdc.be.components.distribution.engine; +import org.openecomp.sdc.be.dao.api.ActionStatus; import org.openecomp.sdc.be.model.Service; import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus; - -import fj.data.Either; +import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; public interface IDistributionEngine { - public boolean isActive(); + boolean isActive(); + + ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envName, String userId, String modifierName); + + ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envId, String envName, String userId, String modifierName); + + StorageOperationStatus isEnvironmentAvailable(String envName); + + StorageOperationStatus isEnvironmentAvailable(); - public StorageOperationStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envName, String userId, String modifierName); + /** + * Currently, it used for tests. For real implementation we need cancel the initialization task and the polling task. + * + * @param envName + */ + void disableEnvironment(String envName); - public StorageOperationStatus isEnvironmentAvailable(String envName); + StorageOperationStatus isReadyForDistribution(Service service, String envName); - /** - * Currently, it used for tests. For real implementation we need cancel the initialization task and the polling task. - * - * @param envName - */ - public void disableEnvironment(String envName); + INotificationData buildServiceForDistribution(Service service, String distributionId, String workloadContext); - public Either<INotificationData, StorageOperationStatus> isReadyForDistribution(Service service, String distributionId, String envName); + StorageOperationStatus verifyServiceHasDeploymentArtifacts(Service service); + OperationalEnvironmentEntry getEnvironmentById(String opEnvId); } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapAuditNotificationData.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapAuditNotificationData.java new file mode 100644 index 0000000000..c1b7a313fb --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapAuditNotificationData.java @@ -0,0 +1,6 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +public interface IDmaapAuditNotificationData { + String getOperationalEnvironmentName(); + String getTenantContext(); +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapNotificationData.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapNotificationData.java new file mode 100644 index 0000000000..7b974e8a96 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDmaapNotificationData.java @@ -0,0 +1,76 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.components.distribution.engine; + +import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.getEnumValueByFieldValue; + +public interface IDmaapNotificationData { + String getOperationalEnvironmentId(); + + OperationaEnvironmentTypeEnum getOperationalEnvironmentType(); + + DmaapActionEnum getAction(); + + + + + + enum DmaapActionEnum { + DELETE("Delete"), + CREATE("Create"), + UPDATE("Update"), + UNKONW("UNKONW") + + ; + private String actionName; + + private DmaapActionEnum(String actionName) { + this.actionName = actionName; + } + + public String getActionName() { + return actionName; + } + + public static DmaapActionEnum findByName(String actionName){ + return getEnumValueByFieldValue(actionName, DmaapActionEnum.values(), DmaapActionEnum::getActionName, UNKONW, false); + } + }; + enum OperationaEnvironmentTypeEnum { + ECOMP("ECOMP"), + UNKONW("UNKONW") + ; + private String eventTypenName; + + private OperationaEnvironmentTypeEnum(String eventTypenName) { + this.eventTypenName = eventTypenName; + } + + public String getEventTypenName() { + return eventTypenName; + } + + public static OperationaEnvironmentTypeEnum findByName(String operationalEnvironmentTypeName){ + return getEnumValueByFieldValue(operationalEnvironmentTypeName, OperationaEnvironmentTypeEnum.values(), OperationaEnvironmentTypeEnum::getEventTypenName, UNKONW, false); + } + }; + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationData.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationData.java index d631724701..d66f8f92f1 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationData.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationData.java @@ -23,84 +23,84 @@ package org.openecomp.sdc.be.components.distribution.engine; import java.util.List; public interface INotificationData { - /** - * Global Distribution Identifier: UUID generated by ASDC per each distribution activation.<br> - * Generated UUID is compliant with RFC 4122.<br> - * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br> - * Ex.: AA97B177-9383-4934-8543-0F91A7A02836 - */ - String getDistributionID(); - - /** Logical Service Name. */ - String getServiceName(); - - /** - * Service Version.<br> - * Two dot (".") separated digit blocks.<br> - * Ex. : "2.0" - */ - String getServiceVersion(); - - /** - * Global UUID generated by ASDC per each service version. Generated UUID is compliant with RFC 4122.<br> - * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br> - * Ex. : AA97B177-9383-4934-8543-0F91A7A02836 - */ - String getServiceUUID(); - - /** - * Service description - */ - String getServiceDescription(); - - /** - * ServiceInvariant UUID - */ - String getServiceInvariantUUID(); - - /** List of the resource instances */ - List<JsonContainerResourceInstance> getResources(); - - /** List of the artifacts */ - List<ArtifactInfoImpl> getServiceArtifacts(); - - String getWorkloadContext(); - - void setDistributionID(String distributionId); - - /** Logical Service Name. */ - void setServiceName(String serviceName); - - /** - * Service Version.<br> - * Two dot (".") separated digit blocks.<br> - * Ex. : "2.0" - */ - void setServiceVersion(String serviceVersion); - - /** - * Global UUID generated by ASDC per each service version. Generated UUID is compliant with RFC 4122.<br> - * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br> - * Ex. : AA97B177-9383-4934-8543-0F91A7A02836 - */ - void setServiceUUID(String serviceUUID); - - /** - * Service Description - */ - void setServiceDescription(String serviceDescription); - - /** - * ServiceInvariant UUID - */ - void setServiceInvariantUUID(String serviceInvariantUuid); - - /** List of the Resource Instances */ - void setResources(List<JsonContainerResourceInstance> resource); - - /** List of the Resource Instances */ - void setServiceArtifacts(List<ArtifactInfoImpl> artifacts); - - void setWorkloadContext(String workloadContext); + /** + * Global Distribution Identifier: UUID generated by ASDC per each distribution activation.<br> + * Generated UUID is compliant with RFC 4122.<br> + * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br> + * Ex.: AA97B177-9383-4934-8543-0F91A7A02836 + */ + String getDistributionID(); + + /** Logical Service Name. */ + String getServiceName(); + + /** + * Service Version.<br> + * Two dot (".") separated digit blocks.<br> + * Ex. : "2.0" + */ + String getServiceVersion(); + + /** + * Global UUID generated by ASDC per each service version. Generated UUID is compliant with RFC 4122.<br> + * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br> + * Ex. : AA97B177-9383-4934-8543-0F91A7A02836 + */ + String getServiceUUID(); + + /** + * Service description + */ + String getServiceDescription(); + + /** + * ServiceInvariant UUID + */ + String getServiceInvariantUUID(); + + /** List of the resource instances */ + List<JsonContainerResourceInstance> getResources(); + + /** List of the artifacts */ + List<ArtifactInfoImpl> getServiceArtifacts(); + + String getWorkloadContext(); + + void setDistributionID(String distributionId); + + /** Logical Service Name. */ + void setServiceName(String serviceName); + + /** + * Service Version.<br> + * Two dot (".") separated digit blocks.<br> + * Ex. : "2.0" + */ + void setServiceVersion(String serviceVersion); + + /** + * Global UUID generated by ASDC per each service version. Generated UUID is compliant with RFC 4122.<br> + * It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br> + * Ex. : AA97B177-9383-4934-8543-0F91A7A02836 + */ + void setServiceUUID(String serviceUUID); + + /** + * Service Description + */ + void setServiceDescription(String serviceDescription); + + /** + * ServiceInvariant UUID + */ + void setServiceInvariantUUID(String serviceInvariantUuid); + + /** List of the Resource Instances */ + void setResources(List<JsonContainerResourceInstance> resource); + + /** List of the Resource Instances */ + void setServiceArtifacts(List<ArtifactInfoImpl> artifacts); + + void setWorkloadContext(String workloadContext); } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationHandler.java new file mode 100644 index 0000000000..a1936c61dd --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/INotificationHandler.java @@ -0,0 +1,11 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +public interface INotificationHandler { + /** + * Allows to handle received topic message + * @param notification + * @return true if finished successfully otherwise false + */ + public boolean handleMessage(String notification); + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IResourceArtifactInfo.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IResourceArtifactInfo.java index 9a77b9f94f..deac8751b3 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IResourceArtifactInfo.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IResourceArtifactInfo.java @@ -22,18 +22,18 @@ package org.openecomp.sdc.be.components.distribution.engine; public interface IResourceArtifactInfo extends IArtifactInfo { - /** resource name */ - String getResourceName(); + /** resource name */ + String getResourceName(); - /** resource version */ - String getResourceVersion(); + /** resource version */ + String getResourceVersion(); - /** - * Global UUID of the resource that specific artifact belongs to.<br> - * It is generated by ASDC per each resource version.<br> - * Generated UUID is compliant with RFC 4122. It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-"). <br> - * Ex.: AA97B177-9383-4934-8543-0F91A7A02836 - */ - String getResourceUUID(); + /** + * Global UUID of the resource that specific artifact belongs to.<br> + * It is generated by ASDC per each resource version.<br> + * Generated UUID is compliant with RFC 4122. It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-"). <br> + * Ex.: AA97B177-9383-4934-8543-0F91A7A02836 + */ + String getResourceUUID(); } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/JsonContainerResourceInstance.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/JsonContainerResourceInstance.java index 5efcfe7fa8..db0e1e938d 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/JsonContainerResourceInstance.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/JsonContainerResourceInstance.java @@ -20,109 +20,129 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.util.List; - import org.openecomp.sdc.be.model.ComponentInstance; +import java.util.List; + public class JsonContainerResourceInstance { - private String resourceInstanceName, resourceName, resourceVersion, resoucreType, resourceUUID, resourceInvariantUUID, resourceCustomizationUUID, category, subcategory; - private List<ArtifactInfoImpl> artifacts; - - public JsonContainerResourceInstance(ComponentInstance resourceInstance, String resourceType, List<ArtifactInfoImpl> artifacts) { - super(); - this.resourceInstanceName = resourceInstance.getName(); - this.resourceName = resourceInstance.getComponentName(); - this.resourceVersion = resourceInstance.getComponentVersion(); - this.resoucreType = resourceType; - this.resourceUUID = resourceInstance.getComponentUid(); - this.artifacts = artifacts; - this.resourceCustomizationUUID = resourceInstance.getCustomizationUUID(); - } - - public String getResourceInstanceName() { - return resourceInstanceName; - } - - public void setResourceInstanceName(String resourceInstanceName) { - this.resourceInstanceName = resourceInstanceName; - } - - public String getResourceName() { - return resourceName; - } - - public void setResourceName(String resourceName) { - this.resourceName = resourceName; - } - - public String getResourceVersion() { - return resourceVersion; - } - - public void setResourceVersion(String resourceVersion) { - this.resourceVersion = resourceVersion; - } - - public String getResoucreType() { - return resoucreType; - } - - public void setResoucreType(String resoucreType) { - this.resoucreType = resoucreType; - } - - public String getResourceUUID() { - return resourceUUID; - } - - public void setResourceUUID(String resourceUUID) { - this.resourceUUID = resourceUUID; - } - - public List<ArtifactInfoImpl> getArtifacts() { - return artifacts; - } - - public void setArtifacts(List<ArtifactInfoImpl> artifacts) { - this.artifacts = artifacts; - } - - public String getResourceInvariantUUID() { - return resourceInvariantUUID; - } - - public void setResourceInvariantUUID(String resourceInvariantUUID) { - this.resourceInvariantUUID = resourceInvariantUUID; - } - - public String getResourceCustomizationUUID() { - return resourceCustomizationUUID; - } - - public void setResourceCustomizationUUID(String customizationUUID) { - this.resourceCustomizationUUID = customizationUUID; - } - - public String getCategory() { - return category; - } - - public void setCategory(String category) { - this.category = category; - } - - public String getSubcategory() { - return subcategory; - } - - public void setSubcategory(String subcategory) { - this.subcategory = subcategory; - } - - @Override - public String toString() { - return "JsonContainerResourceInstance [resourceInstanceName=" + resourceInstanceName + ", resourceName=" + resourceName + ", resourceVersion=" + resourceVersion + ", resoucreType=" + resoucreType + ", resourceUUID=" + resourceUUID - + ", resourceInvariantUUID=" + resourceInvariantUUID + ", resourceCustomizationUUID=" + resourceCustomizationUUID + ", category=" + category + ", subcategory=" + subcategory + ", artifacts=" + artifacts + "]"; - } - + private String resourceInstanceName; + private String resourceName; + private String resourceVersion; + private String resourceType; + private String resourceUUID; + private String resourceInvariantUUID; + private String resourceCustomizationUUID; + private String category; + private String subcategory; + private List<ArtifactInfoImpl> artifacts; + + public JsonContainerResourceInstance(ComponentInstance resourceInstance, String resourceType, List<ArtifactInfoImpl> artifacts) { + super(); + this.resourceInstanceName = resourceInstance.getName(); + this.resourceName = resourceInstance.getComponentName(); + this.resourceVersion = resourceInstance.getComponentVersion(); + this.resourceType = resourceType; + this.resourceUUID = resourceInstance.getComponentUid(); + this.artifacts = artifacts; + this.resourceCustomizationUUID = resourceInstance.getCustomizationUUID(); + } + + public JsonContainerResourceInstance(ComponentInstance resourceInstance, List<ArtifactInfoImpl> artifacts) { + super(); + this.resourceInstanceName = resourceInstance.getName(); + this.resourceName = resourceInstance.getComponentName(); + this.resourceVersion = resourceInstance.getComponentVersion(); + if(resourceInstance.getOriginType() != null) + this.resourceType = resourceInstance.getOriginType().getValue(); + this.resourceUUID = resourceInstance.getComponentUid(); + this.artifacts = artifacts; + this.resourceCustomizationUUID = resourceInstance.getCustomizationUUID(); + } + + public String getResourceInstanceName() { + return resourceInstanceName; + } + + public void setResourceInstanceName(String resourceInstanceName) { + this.resourceInstanceName = resourceInstanceName; + } + + public String getResourceName() { + return resourceName; + } + + public void setResourceName(String resourceName) { + this.resourceName = resourceName; + } + + public String getResourceVersion() { + return resourceVersion; + } + + public void setResourceVersion(String resourceVersion) { + this.resourceVersion = resourceVersion; + } + + public String getResoucreType() { + return resourceType; + } + + public void setResoucreType(String resoucreType) { + this.resourceType = resoucreType; + } + + public String getResourceUUID() { + return resourceUUID; + } + + public void setResourceUUID(String resourceUUID) { + this.resourceUUID = resourceUUID; + } + + public List<ArtifactInfoImpl> getArtifacts() { + return artifacts; + } + + public void setArtifacts(List<ArtifactInfoImpl> artifacts) { + this.artifacts = artifacts; + } + + public String getResourceInvariantUUID() { + return resourceInvariantUUID; + } + + public void setResourceInvariantUUID(String resourceInvariantUUID) { + this.resourceInvariantUUID = resourceInvariantUUID; + } + + public String getResourceCustomizationUUID() { + return resourceCustomizationUUID; + } + + public void setResourceCustomizationUUID(String customizationUUID) { + this.resourceCustomizationUUID = customizationUUID; + } + + public String getCategory() { + return category; + } + + public void setCategory(String category) { + this.category = category; + } + + public String getSubcategory() { + return subcategory; + } + + public void setSubcategory(String subcategory) { + this.subcategory = subcategory; + } + + @Override + public String toString() { + return "JsonContainerResourceInstance [resourceInstanceName=" + resourceInstanceName + ", resourceName=" + resourceName + ", resourceVersion=" + resourceVersion + ", resoucreType=" + resourceType + ", resourceUUID=" + resourceUUID + + ", resourceInvariantUUID=" + resourceInvariantUUID + ", resourceCustomizationUUID=" + resourceCustomizationUUID + ", category=" + category + ", subcategory=" + subcategory + ", artifacts=" + artifacts + "]"; + } + } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationDataImpl.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationDataImpl.java index 353039647d..1db67a9581 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationDataImpl.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationDataImpl.java @@ -24,111 +24,111 @@ import java.util.List; public class NotificationDataImpl implements INotificationData { - private String distributionID; - private String serviceName; - private String serviceVersion; - private String serviceUUID; - private String serviceDescription; - private String serviceInvariantUUID; - private List<JsonContainerResourceInstance> resources; - private List<ArtifactInfoImpl> serviceArtifacts; - private String workloadContext; - - @Override - public String getDistributionID() { - return distributionID; - } - - @Override - public String getServiceName() { - return serviceName; - } - - @Override - public String getServiceVersion() { - return serviceVersion; - } - - @Override - public String getServiceUUID() { - return serviceUUID; - } - - public void setDistributionID(String distributionID) { - this.distributionID = distributionID; - } - - public void setServiceName(String serviceName) { - this.serviceName = serviceName; - } - - public void setServiceVersion(String serviceVersion) { - this.serviceVersion = serviceVersion; - } - - public void setServiceUUID(String serviceUUID) { - this.serviceUUID = serviceUUID; - } - - public String getServiceDescription() { - return serviceDescription; - } - - public void setServiceDescription(String serviceDescription) { - this.serviceDescription = serviceDescription; - } - @Override - public String getWorkloadContext() { return workloadContext; } - - @Override - public void setWorkloadContext(String workloadContext) { this.workloadContext = workloadContext; } - - @Override - public String toString() { - return "NotificationDataImpl{" + - "distributionID='" + distributionID + '\'' + - ", serviceName='" + serviceName + '\'' + - ", serviceVersion='" + serviceVersion + '\'' + - ", serviceUUID='" + serviceUUID + '\'' + - ", serviceDescription='" + serviceDescription + '\'' + - ", serviceInvariantUUID='" + serviceInvariantUUID + '\'' + - ", resources=" + resources + - ", serviceArtifacts=" + serviceArtifacts + - ", workloadContext='" + workloadContext + '\'' + - '}'; - } - - @Override - public List<JsonContainerResourceInstance> getResources() { - return resources; - } - - @Override - public void setResources(List<JsonContainerResourceInstance> resources) { - this.resources = resources; - - } - - @Override - public List<ArtifactInfoImpl> getServiceArtifacts() { - // TODO Auto-generated method stub - return serviceArtifacts; - } - - @Override - public void setServiceArtifacts(List<ArtifactInfoImpl> serviceArtifacts) { - this.serviceArtifacts = serviceArtifacts; - - } - - @Override - public String getServiceInvariantUUID() { - return serviceInvariantUUID; - } - - @Override - public void setServiceInvariantUUID(String serviceInvariantUUID) { - this.serviceInvariantUUID = serviceInvariantUUID; - } + private String distributionID; + private String serviceName; + private String serviceVersion; + private String serviceUUID; + private String serviceDescription; + private String serviceInvariantUUID; + private List<JsonContainerResourceInstance> resources; + private List<ArtifactInfoImpl> serviceArtifacts; + private String workloadContext; + + @Override + public String getDistributionID() { + return distributionID; + } + + @Override + public String getServiceName() { + return serviceName; + } + + @Override + public String getServiceVersion() { + return serviceVersion; + } + + @Override + public String getServiceUUID() { + return serviceUUID; + } + + public void setDistributionID(String distributionID) { + this.distributionID = distributionID; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public void setServiceVersion(String serviceVersion) { + this.serviceVersion = serviceVersion; + } + + public void setServiceUUID(String serviceUUID) { + this.serviceUUID = serviceUUID; + } + + public String getServiceDescription() { + return serviceDescription; + } + + public void setServiceDescription(String serviceDescription) { + this.serviceDescription = serviceDescription; + } + @Override + public String getWorkloadContext() { return workloadContext; } + + @Override + public void setWorkloadContext(String workloadContext) { this.workloadContext = workloadContext; } + + @Override + public String toString() { + return "NotificationDataImpl{" + + "distributionID='" + distributionID + '\'' + + ", serviceName='" + serviceName + '\'' + + ", serviceVersion='" + serviceVersion + '\'' + + ", serviceUUID='" + serviceUUID + '\'' + + ", serviceDescription='" + serviceDescription + '\'' + + ", serviceInvariantUUID='" + serviceInvariantUUID + '\'' + + ", resources=" + resources + + ", serviceArtifacts=" + serviceArtifacts + + ", workloadContext='" + workloadContext + '\'' + + '}'; + } + + @Override + public List<JsonContainerResourceInstance> getResources() { + return resources; + } + + @Override + public void setResources(List<JsonContainerResourceInstance> resources) { + this.resources = resources; + + } + + @Override + public List<ArtifactInfoImpl> getServiceArtifacts() { + // TODO Auto-generated method stub + return serviceArtifacts; + } + + @Override + public void setServiceArtifacts(List<ArtifactInfoImpl> serviceArtifacts) { + this.serviceArtifacts = serviceArtifacts; + + } + + @Override + public String getServiceInvariantUUID() { + return serviceInvariantUUID; + } + + @Override + public void setServiceInvariantUUID(String serviceInvariantUUID) { + this.serviceInvariantUUID = serviceInvariantUUID; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationExecutorService.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationExecutorService.java index 74fbb2c660..dc58a24e5f 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationExecutorService.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/NotificationExecutorService.java @@ -20,62 +20,57 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionNotificationTopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.*; public class NotificationExecutorService { - private static Logger logger = LoggerFactory.getLogger(NotificationExecutorService.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(NotificationExecutorService.class); - public ExecutorService createExcecutorService(DistributionNotificationTopicConfig distributionNotificationTopic) { + public ExecutorService createExcecutorService(DistributionNotificationTopicConfig distributionNotificationTopic) { - Integer minThreadPoolSize = distributionNotificationTopic.getMinThreadPoolSize(); - if (minThreadPoolSize == null) { - minThreadPoolSize = 0; - } + Integer minThreadPoolSize = distributionNotificationTopic.getMinThreadPoolSize(); + if (minThreadPoolSize == null) { + minThreadPoolSize = 0; + } - Integer maxThreadPoolSize = distributionNotificationTopic.getMaxThreadPoolSize(); - if (maxThreadPoolSize == null) { - maxThreadPoolSize = 10; - } + Integer maxThreadPoolSize = distributionNotificationTopic.getMaxThreadPoolSize(); + if (maxThreadPoolSize == null) { + maxThreadPoolSize = 10; + } - ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); - threadFactoryBuilder.setNameFormat("distribution-notification-thread-%d"); - ThreadFactory threadFactory = threadFactoryBuilder.build(); + ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); + threadFactoryBuilder.setNameFormat("distribution-notification-thread-%d"); + ThreadFactory threadFactory = threadFactoryBuilder.build(); - ExecutorService executorService = new ThreadPoolExecutor(minThreadPoolSize, maxThreadPoolSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); + ExecutorService executorService = new ThreadPoolExecutor(minThreadPoolSize, maxThreadPoolSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); - return executorService; - } + return executorService; + } - public void shutdownAndAwaitTermination(ExecutorService pool, long maxTimeToWait) { + public void shutdownAndAwaitTermination(ExecutorService pool, long maxTimeToWait) { - logger.debug("shutdown NotificationExecutorService"); - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(maxTimeToWait, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(maxTimeToWait, TimeUnit.SECONDS)) { - logger.debug("Failed to close executor service"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } + logger.debug("shutdown NotificationExecutorService"); + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(maxTimeToWait, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(maxTimeToWait, TimeUnit.SECONDS)) { + logger.debug("Failed to close executor service"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/PublishNotificationRunnable.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/PublishNotificationRunnable.java deleted file mode 100644 index c283ecc92b..0000000000 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/PublishNotificationRunnable.java +++ /dev/null @@ -1,156 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * SDC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.sdc.be.components.distribution.engine; - -import org.openecomp.sdc.be.config.DistributionEngineConfiguration; -import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; -import org.openecomp.sdc.be.impl.ComponentsUtils; -import org.openecomp.sdc.be.model.Service; -import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum; -import org.openecomp.sdc.common.util.ThreadLocalsHolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PublishNotificationRunnable implements Runnable { - - private String envName; - private String distributionId; - private Service service; - private INotificationData data; - private DistributionEngineConfiguration deConfiguration; - private String topicName; - private CambriaHandler cambriaHandler; - private ComponentsUtils componentUtils; - private String userId; - private String modifierName; - private String requestId; - - private static Logger logger = LoggerFactory.getLogger(PublishNotificationRunnable.class.getName()); - - public PublishNotificationRunnable(String envName, String distributionId, Service service, INotificationData data, DistributionEngineConfiguration deConfiguration, String topicName, String userId, String modifierName, - CambriaHandler cambriaHandler, ComponentsUtils componentUtils, String requestId) { - super(); - this.envName = envName; - this.distributionId = distributionId; - this.service = service; - this.data = data; - this.deConfiguration = deConfiguration; - this.topicName = topicName; - this.cambriaHandler = cambriaHandler; - this.componentUtils = componentUtils; - this.userId = userId; - this.modifierName = modifierName; - this.requestId = requestId; - } - - public INotificationData getData() { - return data; - } - - public void setData(INotificationData data) { - this.data = data; - } - - public DistributionEngineConfiguration getDeConfiguration() { - return deConfiguration; - } - - public void setDeConfiguration(DistributionEngineConfiguration deConfiguration) { - this.deConfiguration = deConfiguration; - } - - public String getTopicName() { - return topicName; - } - - public void setTopicName(String topicName) { - this.topicName = topicName; - } - - public String getUserId() { - return userId; - } - - public void setUserId(String userId) { - this.userId = userId; - } - - public String getModifierName() { - return modifierName; - } - - public void setModifierName(String modifierName) { - this.modifierName = modifierName; - } - - @Override - public void run() { - - long startTime = System.currentTimeMillis(); - ThreadLocalsHolder.setUuid(this.requestId); - - CambriaErrorResponse status = cambriaHandler.sendNotificationAndClose(topicName, deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), deConfiguration.getUebServers(), data, - deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds()); - - logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode()); - auditDistributionNotification(topicName, status, service, distributionId, envName, userId, modifierName); - - long endTime = System.currentTimeMillis(); - logger.debug("After building and publishing artifacts object. Total took {} milliseconds", (endTime - startTime)); - - } - - private void auditDistributionNotification(String topicName, CambriaErrorResponse status, Service service, String distributionId, String envName, String userId, String modifierName) { - if (this.componentUtils != null) { - Integer httpCode = status.getHttpCode(); - String httpCodeStr = String.valueOf(httpCode); - - String desc = getDescriptionFromErrorResponse(status); - - this.componentUtils.auditDistributionNotification(AuditingActionEnum.DISTRIBUTION_NOTIFY, service.getUUID(), service.getName(), "Service", service.getVersion(), userId, modifierName, envName, service.getLifecycleState().name(), topicName, - distributionId, desc, httpCodeStr); - } - } - - private String getDescriptionFromErrorResponse(CambriaErrorResponse status) { - - CambriaOperationStatus operationStatus = status.getOperationStatus(); - - switch (operationStatus) { - case OK: - return "OK"; - case AUTHENTICATION_ERROR: - return "Error: Authentication problem towards U-EB server"; - case INTERNAL_SERVER_ERROR: - return "Error: Internal U-EB server error"; - case UNKNOWN_HOST_ERROR: - return "Error: Cannot reach U-EB server host"; - case CONNNECTION_ERROR: - return "Error: Cannot connect to U-EB server"; - case OBJECT_NOT_FOUND: - return "Error: object not found in U-EB server"; - default: - return "Error: Internal Cambria server problem"; - - } - - } -} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ResourceArtifactInfoImpl.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ResourceArtifactInfoImpl.java index 31f3cb6fda..19a857a115 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ResourceArtifactInfoImpl.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ResourceArtifactInfoImpl.java @@ -22,37 +22,37 @@ package org.openecomp.sdc.be.components.distribution.engine; public class ResourceArtifactInfoImpl extends ArtifactInfoImpl implements IResourceArtifactInfo { - private String resourceName; - private String resourceVersion; - private String resourceUUID; - - public String getResourceName() { - return resourceName; - } - - public void setResourceName(String resourceName) { - this.resourceName = resourceName; - } - - public String getResourceVersion() { - return resourceVersion; - } - - public void setResourceVersion(String resourceVersion) { - this.resourceVersion = resourceVersion; - } - - public String getResourceUUID() { - return resourceUUID; - } - - public void setResourceUUID(String resourceUUID) { - this.resourceUUID = resourceUUID; - } - - @Override - public String toString() { - return "ResourceArtifactInfoImpl [resourceName=" + resourceName + ", resourceVersion=" + resourceVersion + ", resourceUUID=" + resourceUUID + super.toString() + "]"; - } + private String resourceName; + private String resourceVersion; + private String resourceUUID; + + public String getResourceName() { + return resourceName; + } + + public void setResourceName(String resourceName) { + this.resourceName = resourceName; + } + + public String getResourceVersion() { + return resourceVersion; + } + + public void setResourceVersion(String resourceVersion) { + this.resourceVersion = resourceVersion; + } + + public String getResourceUUID() { + return resourceUUID; + } + + public void setResourceUUID(String resourceUUID) { + this.resourceUUID = resourceUUID; + } + + @Override + public String toString() { + return "ResourceArtifactInfoImpl [resourceName=" + resourceName + ", resourceVersion=" + resourceVersion + ", resourceUUID=" + resourceUUID + super.toString() + "]"; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceArtifactInfoImpl.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceArtifactInfoImpl.java index 50d1700f37..1d626805f6 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceArtifactInfoImpl.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceArtifactInfoImpl.java @@ -22,9 +22,9 @@ package org.openecomp.sdc.be.components.distribution.engine; public class ServiceArtifactInfoImpl extends ArtifactInfoImpl implements IServiceArtifactInfo { - @Override - public String toString() { - return "ServiceArtifactInfoImpl [" + super.toString() + "]"; - } + @Override + public String toString() { + return "ServiceArtifactInfoImpl [" + super.toString() + "]"; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceDistributionArtifactsBuilder.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceDistributionArtifactsBuilder.java index 0330a756c6..f3d17979ba 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceDistributionArtifactsBuilder.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/ServiceDistributionArtifactsBuilder.java @@ -20,26 +20,16 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; - -import javax.annotation.PostConstruct; +import fj.data.Either; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.openecomp.sdc.be.config.ConfigurationManager; -import org.openecomp.sdc.be.model.ArtifactDefinition; -import org.openecomp.sdc.be.model.ComponentInstance; -import org.openecomp.sdc.be.model.ComponentParametersView; -import org.openecomp.sdc.be.model.Resource; -import org.openecomp.sdc.be.model.Service; +import org.openecomp.sdc.be.model.*; import org.openecomp.sdc.be.model.category.CategoryDefinition; import org.openecomp.sdc.be.model.category.SubCategoryDefinition; import org.openecomp.sdc.be.model.jsontitan.operations.ToscaOperationFacade; import org.openecomp.sdc.be.model.operations.api.IArtifactOperation; -import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus; import org.openecomp.sdc.be.model.operations.impl.InterfaceLifecycleOperation; import org.openecomp.sdc.common.api.ArtifactTypeEnum; import org.slf4j.Logger; @@ -47,279 +37,224 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import fj.data.Either; +import java.util.*; +import java.util.stream.Collectors; @Component("serviceDistributionArtifactsBuilder") public class ServiceDistributionArtifactsBuilder { - private int defaultArtifactInstallTimeout = 60; - - private static Logger logger = LoggerFactory.getLogger(ServiceDistributionArtifactsBuilder.class.getName()); - - final static String BASE_ARTIFACT_URL = "/sdc/v1/catalog/services/%s/%s/"; - final static String RESOURCE_ARTIFACT_URL = BASE_ARTIFACT_URL + "resources/%s/%s/artifacts/%s"; - final static String SERVICE_ARTIFACT_URL = BASE_ARTIFACT_URL + "artifacts/%s"; - - final static String RESOURCE_INSTANCE_ARTIFACT_URL = BASE_ARTIFACT_URL + "resourceInstances/%s/artifacts/%s"; - - @javax.annotation.Resource - InterfaceLifecycleOperation interfaceLifecycleOperation; - - @javax.annotation.Resource - IArtifactOperation artifactOperation; - - @Autowired - ToscaOperationFacade toscaOperationFacade; - - /* - * @javax.annotation.Resource private - * InformationDeployedArtifactsBusinessLogic - * informationDeployedArtifactsBusinessLogic; - */ - - @PostConstruct - private void init() { - defaultArtifactInstallTimeout = ConfigurationManager.getConfigurationManager().getConfiguration() - .getDefaultHeatArtifactTimeoutMinutes(); - } - - public InterfaceLifecycleOperation getInterfaceLifecycleOperation() { - return interfaceLifecycleOperation; - } - - public void setInterfaceLifecycleOperation(InterfaceLifecycleOperation interfaceLifecycleOperation) { - this.interfaceLifecycleOperation = interfaceLifecycleOperation; - } - - public INotificationData buildResourceInstanceForDistribution(Service service, String distributionId) { - INotificationData notificationData = new NotificationDataImpl(); - - notificationData.setResources(convertRIToJsonContanier(service)); - notificationData.setServiceName(service.getName()); - notificationData.setServiceVersion(service.getVersion()); - notificationData.setDistributionID(distributionId); - notificationData.setServiceUUID(service.getUUID()); - notificationData.setServiceDescription(service.getDescription()); - notificationData.setServiceInvariantUUID(service.getInvariantUUID()); - String workloadContext= ConfigurationManager.getConfigurationManager().getConfiguration().getWorkloadContext(); - if(workloadContext!=null){ - notificationData.setWorkloadContext(workloadContext); - } - logger.debug("Before returning notification data object {}", notificationData); - - return notificationData; - - } - - public INotificationData buildServiceForDistribution(INotificationData notificationData, Service service) { - - notificationData.setServiceArtifacts(convertServiceArtifactsToArtifactInfo(service)); - - logger.debug("Before returning notification data object {}", notificationData); - - return notificationData; - - } - - private List<ArtifactInfoImpl> convertServiceArtifactsToArtifactInfo(Service service) { - - Map<String, ArtifactDefinition> serviceArtifactsMap = service.getDeploymentArtifacts(); - List<ArtifactDefinition> extractedServiceArtifacts = serviceArtifactsMap.values().stream() - //filters all artifacts with existing EsId - .filter(artifactDef -> artifactDef.checkEsIdExist()) - //collects all filtered artifacts with existing EsId to List - .collect(Collectors.toList()); - - Optional<ArtifactDefinition> toscaTemplateArtifactOptl = exrtactToscaTemplateArtifact(service); - if(toscaTemplateArtifactOptl.isPresent()){ - extractedServiceArtifacts.add(toscaTemplateArtifactOptl.get()); - } - - Optional<ArtifactDefinition> toscaCsarArtifactOptl = exrtactToscaCsarArtifact(service); - if(toscaCsarArtifactOptl.isPresent()){ - extractedServiceArtifacts.add(toscaCsarArtifactOptl.get()); - } - - List<ArtifactInfoImpl> artifacts = ArtifactInfoImpl.convertServiceArtifactToArtifactInfoImpl(service, extractedServiceArtifacts); - return artifacts; - } - - private Optional<ArtifactDefinition> exrtactToscaTemplateArtifact(Service service) { - return service.getToscaArtifacts().values().stream() - //filters TOSCA_TEMPLATE artifact - .filter(e -> e.getArtifactType().equals(ArtifactTypeEnum.TOSCA_TEMPLATE.getType())).findAny(); - } - - private Optional<ArtifactDefinition> exrtactToscaCsarArtifact(Service service) { - return service.getToscaArtifacts().values().stream() - //filters TOSCA_CSAR artifact - .filter(e -> e.getArtifactType().equals(ArtifactTypeEnum.TOSCA_CSAR.getType())).findAny(); - } - - private List<JsonContainerResourceInstance> convertRIToJsonContanier(Service service) { - List<JsonContainerResourceInstance> ret = new ArrayList<JsonContainerResourceInstance>(); - if (service.getComponentInstances() != null) { - for (ComponentInstance resourceInstance : service.getComponentInstances()) { - String resoucreType = resourceInstance.getOriginType().getValue(); - List<ArtifactDefinition> artifactsDefList = getArtifactsWithPayload(resourceInstance); - List<ArtifactInfoImpl> artifacts = ArtifactInfoImpl.convertToArtifactInfoImpl(service, resourceInstance, - artifactsDefList); - - String resourceInvariantUUID = null; - String resourceCategory = null; - String resourceSubcategory = null; - - ComponentParametersView componentParametersView = new ComponentParametersView(); - componentParametersView.disableAll(); - componentParametersView.setIgnoreCategories(false); - Either<Resource, StorageOperationStatus> componentResponse = toscaOperationFacade - .getToscaElement(resourceInstance.getComponentUid(), componentParametersView); - - if (componentResponse.isRight()) { - logger.debug("Resource {} Invariant UUID & Categories retrieving failed", resourceInstance.getComponentUid()); - } else { - Resource resource = componentResponse.left().value(); - resourceInvariantUUID = resource.getInvariantUUID(); - - List<CategoryDefinition> categories = resource.getCategories(); - - if (categories != null) { - CategoryDefinition categoryDefinition = categories.get(0); - - if (categoryDefinition != null) { - resourceCategory = categoryDefinition.getName(); - List<SubCategoryDefinition> subcategories = categoryDefinition.getSubcategories(); - if (null != subcategories) { - SubCategoryDefinition subCategoryDefinition = subcategories.get(0); - - if (subCategoryDefinition != null) { - resourceSubcategory = subCategoryDefinition.getName(); - } - } - } - } - } - - JsonContainerResourceInstance jsonContainer = new JsonContainerResourceInstance(resourceInstance, resoucreType, - rebuildArtifactswith120TimeoutInsteadOf60(artifacts)/*TODO used to send artifacts, the function is a fix to the short timeout bug in distribution*/); - jsonContainer.setResourceInvariantUUID(resourceInvariantUUID); - jsonContainer.setCategory(resourceCategory); - jsonContainer.setSubcategory(resourceSubcategory); - ret.add(jsonContainer); - } - } - return ret; - } - - private List<ArtifactInfoImpl> rebuildArtifactswith120TimeoutInsteadOf60(List<ArtifactInfoImpl> artifacts) { - for(ArtifactInfoImpl artifact : artifacts){ - if(artifact.getArtifactTimeout().equals(60)){ - artifact.setArtifactTimeout(120); - } - } - return artifacts; - } - - private List<ArtifactDefinition> getArtifactsWithPayload(ComponentInstance resourceInstance) { - List<ArtifactDefinition> ret = new ArrayList<ArtifactDefinition>(); - - // List<ArtifactDefinition> informationDeployedArtifacts = - // informationDeployedArtifactsBusinessLogic.getInformationalDeployedArtifactsForResourceInstance(resourceInstance); - List<ArtifactDefinition> deployableArtifacts = new ArrayList<ArtifactDefinition>(); - // deployableArtifacts.addAll(informationDeployedArtifacts); - if (resourceInstance.getDeploymentArtifacts() != null) { - deployableArtifacts.addAll(resourceInstance.getDeploymentArtifacts().values()); - } - - for (ArtifactDefinition artifactDef : deployableArtifacts) { - if (artifactDef.checkEsIdExist()) { - ret.add(artifactDef); - } - } - - return ret; - } - - /** - * build the url for resource intance artifact - * - * @param service - * @param resourceData - * @param artifactName - * @return - */ - public static String buildResourceInstanceArtifactUrl(Service service, ComponentInstance resourceInstance, - String artifactName) { - - String url = String.format(RESOURCE_INSTANCE_ARTIFACT_URL, service.getSystemName(), service.getVersion(), - resourceInstance.getNormalizedName(), artifactName); - - logger.debug("After building artifact url {}", url); - - return url; - } - - /** - * build the url for resource intance artifact - * - * @param service - * @param resourceData - * @param artifactName - * @return - */ - public static String buildServiceArtifactUrl(Service service, String artifactName) { - - String url = String.format(SERVICE_ARTIFACT_URL, service.getSystemName(), service.getVersion(), artifactName); - - logger.debug("After building artifact url {}", url); - - return url; - - } - - /** - * Retrieve all deployment artifacts of all resources under a given service - * - * @param resourceArtifactsResult - * @param service - * @param deConfiguration - * @return - */ - public Either<Boolean, StorageOperationStatus> isServiceContainsDeploymentArtifacts(Service service) { - - Either<Boolean, StorageOperationStatus> result = Either.left(false); - Map<String, ArtifactDefinition> serviseArtifactsMap = service.getDeploymentArtifacts(); - if (serviseArtifactsMap != null && !serviseArtifactsMap.isEmpty()) { - result = Either.left(true); - return result; - } - - List<ComponentInstance> resourceInstances = service.getComponentInstances(); - - if (resourceInstances != null) { - for (ComponentInstance resourceInstance : resourceInstances) { - - Map<String, ArtifactDefinition> deploymentArtifactsMapper = resourceInstance.getDeploymentArtifacts(); - // List<ArtifactDefinition> informationDeployedArtifacts = - // informationDeployedArtifactsBusinessLogic.getInformationalDeployedArtifactsForResourceInstance(resourceInstance); - - boolean isDeployableArtifactFound = isContainsPayload(deploymentArtifactsMapper.values());// || - // isContainsPayload(informationDeployedArtifacts); - if (isDeployableArtifactFound) { - result = Either.left(true); - break; - } - - } - - } - - return result; - } - - private boolean isContainsPayload(Collection<ArtifactDefinition> collection) { - boolean payLoadFound = collection != null && collection.stream().anyMatch(p -> p.checkEsIdExist()); - return payLoadFound; - } + private static final Logger logger = LoggerFactory.getLogger(ServiceDistributionArtifactsBuilder.class); + + static final String BASE_ARTIFACT_URL = "/sdc/v1/catalog/services/%s/%s/"; + static final String RESOURCE_ARTIFACT_URL = BASE_ARTIFACT_URL + "resources/%s/%s/artifacts/%s"; + static final String SERVICE_ARTIFACT_URL = BASE_ARTIFACT_URL + "artifacts/%s"; + static final String RESOURCE_INSTANCE_ARTIFACT_URL = BASE_ARTIFACT_URL + "resourceInstances/%s/artifacts/%s"; + + @javax.annotation.Resource + InterfaceLifecycleOperation interfaceLifecycleOperation; + + @javax.annotation.Resource + IArtifactOperation artifactOperation; + + @Autowired + ToscaOperationFacade toscaOperationFacade; + + public InterfaceLifecycleOperation getInterfaceLifecycleOperation() { + return interfaceLifecycleOperation; + } + + public void setInterfaceLifecycleOperation(InterfaceLifecycleOperation interfaceLifecycleOperation) { + this.interfaceLifecycleOperation = interfaceLifecycleOperation; + } + + private String resolveWorkloadContext(String workloadContext) { + return workloadContext != null ? workloadContext : + ConfigurationManager.getConfigurationManager().getConfiguration().getWorkloadContext(); + } + + public INotificationData buildResourceInstanceForDistribution(Service service, String distributionId, String workloadContext) { + INotificationData notificationData = new NotificationDataImpl(); + + notificationData.setResources(convertRIsToJsonContanier(service)); + notificationData.setServiceName(service.getName()); + notificationData.setServiceVersion(service.getVersion()); + notificationData.setDistributionID(distributionId); + notificationData.setServiceUUID(service.getUUID()); + notificationData.setServiceDescription(service.getDescription()); + notificationData.setServiceInvariantUUID(service.getInvariantUUID()); + workloadContext = resolveWorkloadContext(workloadContext); + if (workloadContext!=null){ + notificationData.setWorkloadContext(workloadContext); + } + logger.debug("Before returning notification data object {}", notificationData); + + return notificationData; + } + + public INotificationData buildServiceForDistribution(INotificationData notificationData, Service service) { + + notificationData.setServiceArtifacts(convertServiceArtifactsToArtifactInfo(service)); + + logger.debug("Before returning notification data object {}", notificationData); + + return notificationData; + } + + private List<ArtifactInfoImpl> convertServiceArtifactsToArtifactInfo(Service service) { + + Map<String, ArtifactDefinition> serviceArtifactsMap = service.getDeploymentArtifacts(); + List<ArtifactDefinition> extractedServiceArtifacts = serviceArtifactsMap.values().stream() + //filters all artifacts with existing EsId + .filter(ArtifactDefinition::checkEsIdExist) + //collects all filtered artifacts with existing EsId to List + .collect(Collectors.toList()); + + Optional<ArtifactDefinition> toscaTemplateArtifactOptl = exrtactToscaTemplateArtifact(service); + if(toscaTemplateArtifactOptl.isPresent()){ + extractedServiceArtifacts.add(toscaTemplateArtifactOptl.get()); + } + + Optional<ArtifactDefinition> toscaCsarArtifactOptl = exrtactToscaCsarArtifact(service); + if(toscaCsarArtifactOptl.isPresent()){ + extractedServiceArtifacts.add(toscaCsarArtifactOptl.get()); + } + + return ArtifactInfoImpl.convertServiceArtifactToArtifactInfoImpl(service, extractedServiceArtifacts); + } + + private Optional<ArtifactDefinition> exrtactToscaTemplateArtifact(Service service) { + return service.getToscaArtifacts().values().stream() + //filters TOSCA_TEMPLATE artifact + .filter(e -> e.getArtifactType().equals(ArtifactTypeEnum.TOSCA_TEMPLATE.getType())).findAny(); + } + + private Optional<ArtifactDefinition> exrtactToscaCsarArtifact(Service service) { + return service.getToscaArtifacts().values().stream() + //filters TOSCA_CSAR artifact + .filter(e -> e.getArtifactType().equals(ArtifactTypeEnum.TOSCA_CSAR.getType())).findAny(); + } + + private List<JsonContainerResourceInstance> convertRIsToJsonContanier(Service service) { + List<JsonContainerResourceInstance> ret = new ArrayList<>(); + if (service.getComponentInstances() != null) { + for (ComponentInstance instance : service.getComponentInstances()) { + JsonContainerResourceInstance jsonContainer = new JsonContainerResourceInstance(instance, convertToArtifactsInfoImpl(service, instance)); + ComponentParametersView filter = new ComponentParametersView(); + filter.disableAll(); + filter.setIgnoreCategories(false); + toscaOperationFacade.getToscaElement(instance.getComponentUid(), filter) + .left() + .bind(r->{fillJsonContainer(jsonContainer, (Resource) r); return Either.left(r);}) + .right() + .forEach(r->logger.debug("Resource {} Invariant UUID & Categories retrieving failed", instance.getComponentUid())); + ret.add(jsonContainer); + } + } + return ret; + } + + private void fillJsonContainer(JsonContainerResourceInstance jsonContainer, Resource resource) { + jsonContainer.setResourceInvariantUUID(resource.getInvariantUUID()); + setCategories(jsonContainer, resource.getCategories()); + } + + private List<ArtifactInfoImpl> convertToArtifactsInfoImpl(Service service, ComponentInstance resourceInstance) { + List<ArtifactInfoImpl> artifacts = ArtifactInfoImpl.convertToArtifactInfoImpl(service, resourceInstance, getArtifactsWithPayload(resourceInstance)); + artifacts.stream().forEach(ArtifactInfoImpl::updateArtifactTimeout); + return artifacts; + } + + private void setCategories(JsonContainerResourceInstance jsonContainer, List<CategoryDefinition> categories) { + if (categories != null) { + CategoryDefinition categoryDefinition = categories.get(0); + + if (categoryDefinition != null) { + jsonContainer.setCategory(categoryDefinition.getName()); + List<SubCategoryDefinition> subcategories = categoryDefinition.getSubcategories(); + if (null != subcategories) { + SubCategoryDefinition subCategoryDefinition = subcategories.get(0); + + if (subCategoryDefinition != null) { + jsonContainer.setSubcategory(subCategoryDefinition.getName()); + } + } + } + } + } + + private List<ArtifactDefinition> getArtifactsWithPayload(ComponentInstance resourceInstance) { + List<ArtifactDefinition> ret = new ArrayList<>(); + + List<ArtifactDefinition> deployableArtifacts = new ArrayList<>(); + if (resourceInstance.getDeploymentArtifacts() != null) { + deployableArtifacts.addAll(resourceInstance.getDeploymentArtifacts().values()); + } + + for (ArtifactDefinition artifactDef : deployableArtifacts) { + if (artifactDef.checkEsIdExist()) { + ret.add(artifactDef); + } + } + + return ret; + } + + /** + * build the URL for resource instance artifact + * + * @param service + * @param resourceInstance + * @param artifactName + * @return URL string + */ + public static String buildResourceInstanceArtifactUrl(Service service, ComponentInstance resourceInstance, + String artifactName) { + + String url = String.format(RESOURCE_INSTANCE_ARTIFACT_URL, service.getSystemName(), service.getVersion(), + resourceInstance.getNormalizedName(), artifactName); + + logger.debug("After building artifact url {}", url); + + return url; + } + + /** + * build the URL for resource instance artifact + * + * @param service + * @param artifactName + * @return URL string + */ + public static String buildServiceArtifactUrl(Service service, String artifactName) { + + String url = String.format(SERVICE_ARTIFACT_URL, service.getSystemName(), service.getVersion(), artifactName); + + logger.debug("After building artifact url {}", url); + + return url; + + } + + /** + * Verifies that the service or at least one of its instance contains deployment artifacts + * + * @param the service + * @return boolean + */ + public boolean verifyServiceContainsDeploymentArtifacts(Service service) { + if (MapUtils.isNotEmpty(service.getDeploymentArtifacts())) { + return true; + } + boolean contains = false; + List<ComponentInstance> resourceInstances = service.getComponentInstances(); + if (CollectionUtils.isNotEmpty(resourceInstances)) { + contains = resourceInstances.stream().anyMatch(i -> isContainsPayload(i.getDeploymentArtifacts())); + } + return contains; + } + + private boolean isContainsPayload(Map<String, ArtifactDefinition> deploymentArtifacts) { + return deploymentArtifacts != null && deploymentArtifacts.values().stream().anyMatch(ArtifactDefinition::checkEsIdExist); + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/SubscriberTypeEnum.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/SubscriberTypeEnum.java index f8c0e3f593..3477648dbd 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/SubscriberTypeEnum.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/SubscriberTypeEnum.java @@ -22,5 +22,5 @@ package org.openecomp.sdc.be.components.distribution.engine; public enum SubscriberTypeEnum { - CONSUMER, PRODUCER; + CONSUMER, PRODUCER; } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/UebHealthCheckCall.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/UebHealthCheckCall.java index c522ca91b5..7a25c2ed69 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/UebHealthCheckCall.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/UebHealthCheckCall.java @@ -20,58 +20,58 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.util.concurrent.Callable; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Callable; + public class UebHealthCheckCall implements Callable<Boolean> { - CambriaHandler cambriaHandler = new CambriaHandler(); + CambriaHandler cambriaHandler = new CambriaHandler(); - String server; - String publicApiKey; + String server; + String publicApiKey; - private static Logger healthLogger = LoggerFactory.getLogger(DistributionEngineClusterHealth.UEB_HEALTH_LOG_CONTEXT); + private static final Logger healthLogger = LoggerFactory.getLogger(DistributionEngineClusterHealth.UEB_HEALTH_LOG_CONTEXT); - private static Logger logger = LoggerFactory.getLogger(UebHealthCheckCall.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(UebHealthCheckCall.class); - public UebHealthCheckCall(String server, String publicApiKey) { - super(); - this.server = server; - this.publicApiKey = publicApiKey; - } + public UebHealthCheckCall(String server, String publicApiKey) { + super(); + this.server = server; + this.publicApiKey = publicApiKey; + } - @Override - public Boolean call() { + @Override + public Boolean call() { - healthLogger.trace("Going to run health check towards ueb server {}", server); + healthLogger.trace("Going to run health check towards ueb server {}", server); - boolean result = false; - CambriaErrorResponse cambriaErrorResponse = cambriaHandler.getApiKey(server, publicApiKey); + boolean result = false; + CambriaErrorResponse cambriaErrorResponse = cambriaHandler.getApiKey(server, publicApiKey); - logger.debug("After running Health check towards ueb server {}. Result is {}", server, cambriaErrorResponse); + logger.debug("After running Health check towards ueb server {}. Result is {}", server, cambriaErrorResponse); - if (cambriaErrorResponse.httpCode < CambriaErrorResponse.HTTP_INTERNAL_SERVER_ERROR) { - logger.debug("After running Health check towards ueb server {}. Error code is {}. Set result to true", server, cambriaErrorResponse.httpCode); - result = true; - } + if (cambriaErrorResponse.httpCode < CambriaErrorResponse.HTTP_INTERNAL_SERVER_ERROR) { + logger.debug("After running Health check towards ueb server {}. Error code is {}. Set result to true", server, cambriaErrorResponse.httpCode); + result = true; + } - healthLogger.trace("Result after running health check towards ueb server {} is {}. Returned result is {} ", server, cambriaErrorResponse, result); + healthLogger.trace("Result after running health check towards ueb server {} is {}. Returned result is {} ", server, cambriaErrorResponse, result); - return result; - } + return result; + } - public String getServer() { - return server; - } + public String getServer() { + return server; + } - public CambriaHandler getCambriaHandler() { - return cambriaHandler; - } + public CambriaHandler getCambriaHandler() { + return cambriaHandler; + } - public void setCambriaHandler(CambriaHandler cambriaHandler) { - this.cambriaHandler = cambriaHandler; - } + public void setCambriaHandler(CambriaHandler cambriaHandler) { + this.cambriaHandler = cambriaHandler; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/VfModuleArtifactPayload.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/VfModuleArtifactPayload.java index d706e40f5c..8f0865f2ac 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/VfModuleArtifactPayload.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/VfModuleArtifactPayload.java @@ -20,105 +20,100 @@ package org.openecomp.sdc.be.components.distribution.engine; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; - import org.openecomp.sdc.be.model.GroupDefinition; import org.openecomp.sdc.be.model.GroupInstance; import org.openecomp.sdc.be.model.GroupInstanceProperty; import org.openecomp.sdc.be.model.GroupProperty; import org.openecomp.sdc.common.api.Constants; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + public class VfModuleArtifactPayload { - - private String vfModuleModelName, vfModuleModelInvariantUUID, vfModuleModelVersion, vfModuleModelUUID, vfModuleModelCustomizationUUID, vfModuleModelDescription; - private Boolean isBase; - private List<String> artifacts; - private Map< String, Object> properties; - - public VfModuleArtifactPayload(GroupDefinition group) { - vfModuleModelName = group.getName(); - vfModuleModelInvariantUUID = group.getInvariantUUID(); - vfModuleModelVersion = group.getVersion(); - vfModuleModelUUID = group.getGroupUUID(); - vfModuleModelDescription = group.getDescription(); - - artifacts = group.getArtifactsUuid(); - // Base Value is set from properties - setBaseValue(group); - - } - - public VfModuleArtifactPayload(GroupInstance group) { - vfModuleModelName = group.getGroupName(); - vfModuleModelInvariantUUID = group.getInvariantUUID(); - vfModuleModelVersion = group.getVersion(); - vfModuleModelUUID = group.getGroupUUID(); - vfModuleModelCustomizationUUID = group.getCustomizationUUID(); - vfModuleModelDescription = group.getDescription(); - - artifacts = group.getArtifactsUuid(); - artifacts.addAll(group.getGroupInstanceArtifactsUuid()); - - // Base Value is set from properties - setBaseValue(group); - - if(group.convertToGroupInstancesProperties() != null) - setProperties(group.convertToGroupInstancesProperties()); - //converts List of GroupInstanceProperties to Map propertyName : GroupInstanceProperty () - //setProperties(group.getGroupInstancesProperties().stream().collect(Collectors.toMap(p->p.getName(), p->p))); - - } - - private void setBaseValue(GroupInstance group) { - if (group.convertToGroupInstancesProperties() != null) { - Optional<GroupInstanceProperty> findBaseProperty = group.convertToGroupInstancesProperties().stream().filter(p -> p.getName().equals(Constants.IS_BASE)).findAny(); - if (findBaseProperty.isPresent()) { - isBase = Boolean.valueOf(findBaseProperty.get().getValue()); - } - - } - } - private void setBaseValue(GroupDefinition group) { - if (group.getProperties() != null) { - Optional<GroupProperty> findBaseProperty = group.convertToGroupProperties().stream().filter(p -> p.getName().equals(Constants.IS_BASE)).findAny(); - if (findBaseProperty.isPresent()) { - isBase = Boolean.valueOf(findBaseProperty.get().getValue()); - } - - } - } - - - - public List<String> getArtifacts() { - return artifacts; - } - - public void setArtifacts(List<String> artifacts) { - this.artifacts = artifacts; - } - - - - public Map<String, Object> getProperties() { - return properties; - } - - /*public void setProperties(Map<String, Object> properties) { - this.properties = properties; - }*/ - - public void setProperties(List<GroupInstanceProperty> properties) { - this.properties = properties.stream().filter(p -> !p.getName().equals(Constants.IS_BASE)).collect( - Collectors.toMap(x -> x.getName(), x -> x.getValue() == null? "":x.getValue() )); - } - - public static int compareByGroupName(VfModuleArtifactPayload art1, VfModuleArtifactPayload art2) { - Float thisCounter = Float.parseFloat(art1.vfModuleModelName.split(Constants.MODULE_NAME_DELIMITER)[1].replace(' ', '.')); - Float otherCounter = Float.parseFloat(art2.vfModuleModelName.split(Constants.MODULE_NAME_DELIMITER)[1].replace(' ', '.')); - return thisCounter.compareTo(otherCounter); - } + + private String vfModuleModelName, vfModuleModelInvariantUUID, vfModuleModelVersion, vfModuleModelUUID, vfModuleModelCustomizationUUID, vfModuleModelDescription; + private Boolean isBase; + private List<String> artifacts; + private Map< String, Object> properties; + + public VfModuleArtifactPayload(GroupDefinition group) { + vfModuleModelName = group.getName(); + vfModuleModelInvariantUUID = group.getInvariantUUID(); + vfModuleModelVersion = group.getVersion(); + vfModuleModelUUID = group.getGroupUUID(); + vfModuleModelDescription = group.getDescription(); + + artifacts = group.getArtifactsUuid(); + // Base Value is set from properties + setBaseValue(group); + + } + + public VfModuleArtifactPayload(GroupInstance group) { + vfModuleModelName = group.getGroupName(); + vfModuleModelInvariantUUID = group.getInvariantUUID(); + vfModuleModelVersion = group.getVersion(); + vfModuleModelUUID = group.getGroupUUID(); + vfModuleModelCustomizationUUID = group.getCustomizationUUID(); + vfModuleModelDescription = group.getDescription(); + + artifacts = new ArrayList<>(group.getArtifactsUuid()); + artifacts.addAll(group.getGroupInstanceArtifactsUuid()); + + // Base Value is set from properties + setBaseValue(group); + + if(group.convertToGroupInstancesProperties() != null) + setProperties(group.convertToGroupInstancesProperties()); + } + + private void setBaseValue(GroupInstance group) { + if (group.convertToGroupInstancesProperties() != null) { + Optional<GroupInstanceProperty> findBaseProperty = group.convertToGroupInstancesProperties().stream().filter(p -> p.getName().equals(Constants.IS_BASE)).findAny(); + if (findBaseProperty.isPresent()) { + isBase = Boolean.valueOf(findBaseProperty.get().getValue()); + } + + } + } + private void setBaseValue(GroupDefinition group) { + if (group.getProperties() != null) { + Optional<GroupProperty> findBaseProperty = group.convertToGroupProperties().stream().filter(p -> p.getName().equals(Constants.IS_BASE)).findAny(); + if (findBaseProperty.isPresent()) { + isBase = Boolean.valueOf(findBaseProperty.get().getValue()); + } + + } + } + + + + public List<String> getArtifacts() { + return artifacts; + } + + public void setArtifacts(List<String> artifacts) { + this.artifacts = artifacts; + } + + + + public Map<String, Object> getProperties() { + return properties; + } + + + public void setProperties(List<GroupInstanceProperty> properties) { + this.properties = properties.stream().filter(p -> !p.getName().equals(Constants.IS_BASE)).collect( + Collectors.toMap(x -> x.getName(), x -> x.getValue() == null? "":x.getValue() )); + } + + public static int compareByGroupName(VfModuleArtifactPayload art1, VfModuleArtifactPayload art2) { + Float thisCounter = Float.parseFloat(art1.vfModuleModelName.split(Constants.MODULE_NAME_DELIMITER)[1].replace(' ', '.')); + Float otherCounter = Float.parseFloat(art2.vfModuleModelName.split(Constants.MODULE_NAME_DELIMITER)[1].replace(' ', '.')); + return thisCounter.compareTo(otherCounter); + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/config/DistributionEngineSpringConfig.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/config/DistributionEngineSpringConfig.java new file mode 100644 index 0000000000..e6f2cc634f --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/config/DistributionEngineSpringConfig.java @@ -0,0 +1,11 @@ +package org.openecomp.sdc.be.components.distribution.engine.config; + +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ComponentScan({"org.openecomp.sdc.be.components.distribution.engine", + }) +public class DistributionEngineSpringConfig { + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/DistributionCompleteReporter.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/DistributionCompleteReporter.java new file mode 100644 index 0000000000..7f9dd45e67 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/DistributionCompleteReporter.java @@ -0,0 +1,9 @@ +package org.openecomp.sdc.be.components.distribution.engine.report; + +import org.openecomp.sdc.be.components.distribution.engine.DistributionStatusNotification; + +public interface DistributionCompleteReporter { + + void reportDistributionComplete(DistributionStatusNotification distributionStatusNotification); + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/MsoDistributionCompleteReporter.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/MsoDistributionCompleteReporter.java new file mode 100644 index 0000000000..4e06b0ed0e --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/report/MsoDistributionCompleteReporter.java @@ -0,0 +1,19 @@ +package org.openecomp.sdc.be.components.distribution.engine.report; + +import org.openecomp.sdc.be.components.distribution.engine.DistributionStatusNotification; +import org.openecomp.sdc.be.components.distribution.engine.rest.MSORestClient; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Component +public class MsoDistributionCompleteReporter implements DistributionCompleteReporter { + + @Resource + private MSORestClient msoClient; + + @Override + public void reportDistributionComplete(DistributionStatusNotification distributionStatusNotification) { + msoClient.notifyDistributionComplete(distributionStatusNotification.getDistributionID(), distributionStatusNotification.getStatus(), distributionStatusNotification.getErrorReason()); + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/DistributionStatusRequest.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/DistributionStatusRequest.java new file mode 100644 index 0000000000..45727fb28f --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/DistributionStatusRequest.java @@ -0,0 +1,23 @@ +package org.openecomp.sdc.be.components.distribution.engine.rest; + +/** + * a class which represents an MSO distribution status rest request body + */ +public class DistributionStatusRequest { + + private String status; + private String errorReason; + + public DistributionStatusRequest(String status, String errorReason) { + this.status = status; + this.errorReason = errorReason; + } + + public String getStatus() { + return status; + } + + public String getErrorReason() { + return errorReason; + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/MSORestClient.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/MSORestClient.java new file mode 100644 index 0000000000..76263f942d --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/rest/MSORestClient.java @@ -0,0 +1,69 @@ +package org.openecomp.sdc.be.components.distribution.engine.rest; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.Gson; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.eclipse.jetty.util.URIUtil; +import org.openecomp.sdc.be.components.distribution.engine.DistributionStatusNotificationEnum; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.common.http.client.api.*; +import org.openecomp.sdc.common.http.config.BasicAuthorization; +import org.openecomp.sdc.common.http.config.ExternalServiceConfig; +import org.openecomp.sdc.common.http.config.HttpClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.Properties; + +@Component +public class MSORestClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(MSORestClient.class); + private static final Gson gson = new Gson(); + @VisibleForTesting + static final String DISTRIBUTIONS_RESOURCE_CONFIG_PARAM = "distributions"; + + private ExternalServiceConfig serviceConfig = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getMsoConfig(); + + public MSORestClient() { + HttpClientConfig httpClientConfig = serviceConfig.getHttpClientConfig(); + int numOfRetries = httpClientConfig.getNumOfRetries(); + if ( numOfRetries > 0 ) { + httpClientConfig.setRetryHandler(RetryHandlers.getDefault(numOfRetries)); + } + } + + public HttpResponse<String> notifyDistributionComplete(String distributionId, DistributionStatusNotificationEnum distributionStatusEnum, String errReason) { + try { + return doNotifyDistributionComplete(distributionId, distributionStatusEnum, errReason); + } + catch(HttpExecuteException e) { + LOGGER.debug("The request to mso failed with exception ", e); + return Responses.INTERNAL_SERVER_ERROR; + } + } + + private HttpResponse<String> doNotifyDistributionComplete(String distributionId, DistributionStatusNotificationEnum distributionStatusEnum, String errReason) throws HttpExecuteException { + StringEntity entity = new StringEntity(gson.toJson(new DistributionStatusRequest(distributionStatusEnum.name(), errReason)), ContentType.APPLICATION_JSON); + HttpResponse<String> response = HttpRequest.patch(buildMsoDistributionUrl(distributionId), buildReqHeader(), entity, serviceConfig.getHttpClientConfig()); + LOGGER.info("response from mso - status code: {}, status description: {}, response: {}, ", response.getStatusCode(), response.getDescription(), response.getResponse()); + return response; + } + + private Properties buildReqHeader() { + Properties properties = new Properties(); + BasicAuthorization basicAuth = serviceConfig.getHttpClientConfig().getBasicAuthorization(); + RestUtils.addBasicAuthHeader(properties, basicAuth.getUserName(), basicAuth.getPassword()); + return properties; + } + + private String buildMsoDistributionUrl(String distributionId) { + String msoBaseUrl = serviceConfig.getHttpRequestConfig().getServerRootUrl(); + String distributionsPath = serviceConfig.getHttpRequestConfig().getResourceNamespaces().get(DISTRIBUTIONS_RESOURCE_CONFIG_PARAM); + String distributionsApiPath = distributionsPath + URIUtil.SLASH + distributionId; + return msoBaseUrl + distributionsApiPath; + } + +} |