From c465f8fac8cc8be671319fca2100e98fc4a4c13f Mon Sep 17 00:00:00 2001 From: vasraz Date: Mon, 19 Jun 2023 17:51:23 +0100 Subject: Disable DMaaP if Kafka active DMaaP healthcheck cannot succeed when using Kafka, which in turn causes distribution to fail Signed-off-by: Vasyl Razinkov Change-Id: Ia6e1e35b9a2e819e38e62caeb797342948f34e92 Issue-ID: SDC-4542 --- .../distribution/engine/DistributionEngine.java | 59 +++++++++++++--------- .../be/components/impl/ServiceBusinessLogic.java | 41 ++++++++------- .../sdc/config/CatalogBESpringConfig.java | 1 + 3 files changed, 58 insertions(+), 43 deletions(-) (limited to 'catalog-be/src/main/java') diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java index cb14ebeebe..cebedadf10 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java @@ -19,18 +19,10 @@ */ package org.openecomp.sdc.be.components.distribution.engine; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import javax.annotation.Resource; +import lombok.Setter; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; import org.openecomp.sdc.be.components.validation.ServiceDistributionValidation; import org.openecomp.sdc.be.config.BeEcompErrorManager; import org.openecomp.sdc.be.config.ConfigurationManager; @@ -44,15 +36,29 @@ import org.openecomp.sdc.common.log.wrappers.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + @Component("distributionEngine") public class DistributionEngine implements IDistributionEngine { private static final Logger logger = Logger.getLogger(DistributionEngine.class.getName()); private static final Pattern FQDN_PATTERN = Pattern.compile( - "^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9])(\\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9]))*(:[0-9]{2,4})*$", - Pattern.CASE_INSENSITIVE); + "^([A-Z0-9]|[A-Z0-9][A-Z0-9\\-]{0,61}[A-Z0-9])(\\.([A-Z0-9]|[A-Z0-9][A-Z0-9\\-]{0,61}[A-Z0-9]))*(:[0-9]{2,4})*$", + Pattern.CASE_INSENSITIVE); @Autowired private EnvironmentsEngine environmentsEngine; + @Autowired + @Setter + private KafkaHandler kafkaHandler; @Resource private DistributionNotificationSender distributionNotificationSender; @Resource @@ -83,7 +89,7 @@ public class DistributionEngine implements IDistributionEngine { private void init() { logger.trace("Enter init method of DistributionEngine"); DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager() - .getDistributionEngineConfiguration(); + .getDistributionEngineConfiguration(); boolean startDistributionEngine = distributionEngineConfiguration.isStartDistributionEngine(); logger.debug("Distribution engine activation parameter is {}", startDistributionEngine); if (!startDistributionEngine) { @@ -94,7 +100,7 @@ public class DistributionEngine implements IDistributionEngine { boolean isValidConfig = validateConfiguration(distributionEngineConfiguration); if (!isValidConfig) { BeEcompErrorManager.getInstance() - .logBeUebSystemError(DistributionEngineInitTask.INIT_DISTRIBUTION_ENGINE_FLOW, "validate distribution configuration in init phase"); + .logBeUebSystemError(DistributionEngineInitTask.INIT_DISTRIBUTION_ENGINE_FLOW, "validate distribution configuration in init phase"); this.distributionEngineClusterHealth.setHealthCheckUebConfigurationError(); return; } @@ -133,13 +139,16 @@ public class DistributionEngine implements IDistributionEngine { */ protected boolean validateConfiguration(DistributionEngineConfiguration deConfiguration) { String methodName = "validateConfiguration"; - boolean result = isValidServers(deConfiguration.getUebServers(), methodName, "uebServers"); - result = isValidParam(deConfiguration.getEnvironments(), methodName, "environments") && result; - result = isValidParam(deConfiguration.getUebPublicKey(), methodName, "uebPublicKey") && result; - result = isValidParam(deConfiguration.getUebSecretKey(), methodName, "uebSecretKey") && result; + boolean result = isValidParam(deConfiguration.getEnvironments(), methodName, "environments"); + + if (!kafkaHandler.isKafkaActive()) { + result = isValidServers(deConfiguration.getUebServers(), methodName, "uebServers") && result; + result = isValidParam(deConfiguration.getUebPublicKey(), methodName, "uebPublicKey") && result; + result = isValidParam(deConfiguration.getUebSecretKey(), methodName, "uebSecretKey") && result; + result = isValidObject(deConfiguration.getCreateTopic(), methodName, "createTopic") && result; + } result = isValidParam(deConfiguration.getDistributionNotifTopicName(), methodName, "distributionNotifTopicName") && result; result = isValidParam(deConfiguration.getDistributionStatusTopicName(), methodName, "distributionStatusTopicName") && result; - result = isValidObject(deConfiguration.getCreateTopic(), methodName, "createTopic") && result; result = isValidObject(deConfiguration.getDistributionStatusTopic(), methodName, "distributionStatusTopic") && result; result = isValidObject(deConfiguration.getInitMaxIntervalSec(), methodName, "initMaxIntervalSec") && result; result = isValidObject(deConfiguration.getInitRetryIntervalSec(), methodName, "initRetryIntervalSec") && result; @@ -251,13 +260,13 @@ public class DistributionEngine implements IDistributionEngine { public ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envId, String envName, User modifier) { logger.debug( - "Received notify service request. distributionId = {}, serviceUuid = {} serviceUid = {}, envName = {}, userId = {}, modifierName {}", - distributionId, service.getUUID(), service.getUniqueId(), envName, service.getLastUpdaterUserId(), modifier); + "Received notify service request. distributionId = {}, serviceUuid = {} serviceUid = {}, envName = {}, userId = {}, modifierName {}", + distributionId, service.getUUID(), service.getUniqueId(), envName, service.getLastUpdaterUserId(), modifier); String topicName = buildTopicName(envName); ActionStatus notifyServiceStatus = Optional.ofNullable(environmentsEngine.getEnvironmentById(envId)).map(EnvironmentMessageBusData::new).map( - messageBusData -> distributionNotificationSender - .sendNotification(topicName, distributionId, messageBusData, notificationData, service, modifier)) - .orElse(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE); + messageBusData -> distributionNotificationSender + .sendNotification(topicName, distributionId, messageBusData, notificationData, service, modifier)) + .orElse(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE); logger.debug("Finish notifyService. status is {}", notifyServiceStatus); return notifyServiceStatus; } @@ -274,7 +283,7 @@ public class DistributionEngine implements IDistributionEngine { if (status != StorageOperationStatus.OK) { String envErrorDec = getEnvironmentErrorDescription(status); BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(DistributionNotificationSender.DISTRIBUTION_NOTIFICATION_SENDING, - "Environment name " + envName + " is not available. Reason : " + envErrorDec); + "Environment name " + envName + " is not available. Reason : " + envErrorDec); } return status; } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogic.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogic.java index cacbd1369d..e641365afa 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogic.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogic.java @@ -69,6 +69,7 @@ import org.openecomp.sdc.be.components.health.HealthCheckBusinessLogic; import org.openecomp.sdc.be.components.impl.exceptions.ByActionStatusComponentException; import org.openecomp.sdc.be.components.impl.exceptions.ByResponseFormatComponentException; import org.openecomp.sdc.be.components.impl.exceptions.ComponentException; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; import org.openecomp.sdc.be.components.path.ForwardingPathValidator; import org.openecomp.sdc.be.components.utils.InterfaceOperationUtils; import org.openecomp.sdc.be.components.utils.PropertiesUtils; @@ -176,7 +177,7 @@ import org.springframework.web.context.WebApplicationContext; @org.springframework.stereotype.Component("serviceBusinessLogic") public class ServiceBusinessLogic extends ComponentBusinessLogic { - static final String IS_VALID = "isValid"; + private static final String IS_VALID = "isValid"; private static final String THE_SERVICE_WITH_SYSTEM_NAME_LOCKED = "The service with system name {} locked. "; private static final String FAILED_TO_LOCK_SERVICE_RESPONSE_IS = "Failed to lock service {}. Response is {}. "; private static final String AUDIT_BEFORE_SENDING_RESPONSE = "audit before sending response"; @@ -196,6 +197,7 @@ public class ServiceBusinessLogic extends ComponentBusinessLogic { private final ServiceCategoryValidator serviceCategoryValidator; private final ServiceValidator serviceValidator; private final GroupBusinessLogic groupBusinessLogic; + private final KafkaHandler kafkaHandler; private ForwardingPathOperation forwardingPathOperation; private AuditCassandraDao auditCassandraDao; private ServiceTypeValidator serviceTypeValidator; @@ -214,7 +216,7 @@ public class ServiceBusinessLogic extends ComponentBusinessLogic { ComponentDescriptionValidator componentDescriptionValidator, ModelOperation modelOperation, final ServiceRoleValidator serviceRoleValidator, final ServiceInstantiationTypeValidator serviceInstantiationTypeValidator, - final ServiceCategoryValidator serviceCategoryValidator, final ServiceValidator serviceValidator) { + final ServiceCategoryValidator serviceCategoryValidator, final ServiceValidator serviceValidator, KafkaHandler kafkaHandler) { super(elementDao, groupOperation, groupInstanceOperation, groupTypeOperation, groupBusinessLogic, interfaceOperation, interfaceLifecycleTypeOperation, artifactsBusinessLogic, artifactToscaOperation, componentContactIdValidator, componentNameValidator, componentTagsValidator, componentValidator, componentIconValidator, componentProjectCodeValidator, componentDescriptionValidator); @@ -229,6 +231,7 @@ public class ServiceBusinessLogic extends ComponentBusinessLogic { this.serviceCategoryValidator = serviceCategoryValidator; this.serviceValidator = serviceValidator; this.groupBusinessLogic = groupBusinessLogic; + this.kafkaHandler = kafkaHandler; } @Autowired @@ -1730,22 +1733,24 @@ public class ServiceBusinessLogic extends ComponentBusinessLogic { log.trace("Update environment name to be {} instead of {}", configuredEnvName, envName); envName = configuredEnvName; } - // DE194021 - ServletContext servletContext = request.getSession().getServletContext(); - boolean isDistributionEngineUp = getHealthCheckBL(servletContext).isDistributionEngineUp(); // DE - if (!isDistributionEngineUp) { - BeEcompErrorManager.getInstance().logBeSystemError("Distribution Engine is DOWN"); - log.debug("Distribution Engine is DOWN"); - response = componentsUtils.getResponseFormat(ActionStatus.GENERAL_ERROR); - return Either.right(response); + if (!kafkaHandler.isKafkaActive()) { + // DE194021 + ServletContext servletContext = request.getSession().getServletContext(); + boolean isDistributionEngineUp = getHealthCheckBL(servletContext).isDistributionEngineUp(); // DE + if (!isDistributionEngineUp) { + BeEcompErrorManager.getInstance().logBeSystemError("Distribution Engine is DOWN"); + log.debug("Distribution Engine is DOWN"); + response = componentsUtils.getResponseFormat(ActionStatus.GENERAL_ERROR); + return Either.right(response); + } } Either serviceRes = toscaOperationFacade.getToscaElement(serviceId); if (serviceRes.isRight()) { log.debug("failed retrieving service"); response = componentsUtils - .getResponseFormat(componentsUtils.convertFromStorageResponse(serviceRes.right().value(), ComponentTypeEnum.SERVICE), serviceId); + .getResponseFormat(componentsUtils.convertFromStorageResponse(serviceRes.right().value(), ComponentTypeEnum.SERVICE), serviceId); componentsUtils.auditComponent(response, user, null, AuditingActionEnum.DISTRIBUTION_STATE_CHANGE_REQUEST, - new ResourceCommonInfo(ComponentTypeEnum.SERVICE.getValue()), ResourceVersionInfo.newBuilder().build(), did); + new ResourceCommonInfo(ComponentTypeEnum.SERVICE.getValue()), ResourceVersionInfo.newBuilder().build(), did); return Either.right(response); } Service service = serviceRes.left().value(); @@ -1756,7 +1761,7 @@ public class ServiceBusinessLogic extends ComponentBusinessLogic { if (service.getLifecycleState() != LifecycleStateEnum.CERTIFIED) { log.info("service {} is not available for distribution. Should be in certified state", service.getUniqueId()); ResponseFormat responseFormat = componentsUtils - .getResponseFormat(ActionStatus.SERVICE_NOT_AVAILABLE_FOR_DISTRIBUTION, service.getVersion(), service.getName()); + .getResponseFormat(ActionStatus.SERVICE_NOT_AVAILABLE_FOR_DISTRIBUTION, service.getVersion(), service.getName()); return Either.right(responseFormat); } String dcurrStatus = service.getDistributionStatus().name(); @@ -1767,7 +1772,7 @@ public class ServiceBusinessLogic extends ComponentBusinessLogic { ActionStatus notifyServiceResponse = distributionEngine.notifyService(did, service, notificationData, envName, user); if (notifyServiceResponse == ActionStatus.OK) { Either updateStateRes = updateDistributionStatusForActivation(service, user, - DistributionStatusEnum.DISTRIBUTED); + DistributionStatusEnum.DISTRIBUTED); if (updateStateRes.isLeft() && updateStateRes.left().value() != null) { updatedService = updateStateRes.left().value(); updatedStatus = updatedService.getDistributionStatus().name(); @@ -1786,13 +1791,13 @@ public class ServiceBusinessLogic extends ComponentBusinessLogic { } } else { response = componentsUtils - .getResponseFormatByDE(componentsUtils.convertFromStorageResponse(readyForDistribution, ComponentTypeEnum.SERVICE), envName); + .getResponseFormatByDE(componentsUtils.convertFromStorageResponse(readyForDistribution, ComponentTypeEnum.SERVICE), envName); result = Either.right(response); } componentsUtils.auditComponent(response, user, service, AuditingActionEnum.DISTRIBUTION_STATE_CHANGE_REQUEST, - new ResourceCommonInfo(service.getName(), ComponentTypeEnum.SERVICE.getValue()), - ResourceVersionInfo.newBuilder().distributionStatus(dcurrStatus).build(), - ResourceVersionInfo.newBuilder().distributionStatus(updatedStatus).build(), null, null, did); + new ResourceCommonInfo(service.getName(), ComponentTypeEnum.SERVICE.getValue()), + ResourceVersionInfo.newBuilder().distributionStatus(dcurrStatus).build(), + ResourceVersionInfo.newBuilder().distributionStatus(updatedStatus).build(), null, null, did); return result; } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/config/CatalogBESpringConfig.java b/catalog-be/src/main/java/org/openecomp/sdc/config/CatalogBESpringConfig.java index b38210116c..5c018fc0f0 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/config/CatalogBESpringConfig.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/config/CatalogBESpringConfig.java @@ -58,6 +58,7 @@ import org.springframework.core.annotation.Order; "org.openecomp.sdc.be.components.csar", "org.openecomp.sdc.be.components.property", "org.openecomp.sdc.be.components.attribute", + "org.openecomp.sdc.be.components.kafka", "org.openecomp.sdc.be.csar.security", "org.openecomp.sdc.be.datamodel.utils", "org.openecomp.sdc.be.components.upgrade", -- cgit 1.2.3-korg