diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/catalog')
9 files changed, 686 insertions, 0 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/IComponentMessage.java b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/IComponentMessage.java new file mode 100644 index 0000000000..42a869137b --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/IComponentMessage.java @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.catalog.api; + +import org.openecomp.sdc.be.catalog.enums.ChangeTypeEnum; +import org.openecomp.sdc.be.model.CatalogUpdateTimestamp; + +import java.io.Serializable; + + + +/** + * Represent Component (service, resource etc...) change message added to the + * message queue by sdc backend.<br> + * + * @author ms172g + * + */ +public interface IComponentMessage extends Serializable, ITypeMessage { + /** + * Change Type + * @return + */ + ChangeTypeEnum getChangeType(); + CatalogUpdateTimestamp getCatalogUpdateTimestamp(); + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/IMessageQueueHandlerProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/IMessageQueueHandlerProducer.java new file mode 100644 index 0000000000..21c3ac8f2e --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/IMessageQueueHandlerProducer.java @@ -0,0 +1,28 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.catalog.api; + +public interface IMessageQueueHandlerProducer { + + IStatus pushMessage(ITypeMessage message); + IStatus init(); + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/IStatus.java b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/IStatus.java new file mode 100644 index 0000000000..dd21c2985d --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/IStatus.java @@ -0,0 +1,43 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.catalog.api; + +import org.openecomp.sdc.be.catalog.enums.ResultStatusEnum; + +@FunctionalInterface +public interface IStatus { + + static IStatus getSuccessStatus() { + + return () -> ResultStatusEnum.SUCCESS; + } + + static IStatus getFailStatus() { + return () -> ResultStatusEnum.FAIL; + } + + static IStatus getServiceDisabled() { + return () -> ResultStatusEnum.SERVICE_DISABLED; + } + + ResultStatusEnum getResultStatus(); + +}
\ No newline at end of file diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/ITypeMessage.java b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/ITypeMessage.java new file mode 100644 index 0000000000..a58e5f87c1 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/api/ITypeMessage.java @@ -0,0 +1,25 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.catalog.api; + +public interface ITypeMessage { + String getMessageType(); +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/enums/ChangeTypeEnum.java b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/enums/ChangeTypeEnum.java new file mode 100644 index 0000000000..5107468af4 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/enums/ChangeTypeEnum.java @@ -0,0 +1,30 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.catalog.enums; + +/** + * Represents The change type SDC Backend requests on the Component.<br> + * @author ms172g + * + */ +public enum ChangeTypeEnum { + LIFECYCLE, DELETE, ARCHIVE, RESTORE +}
\ No newline at end of file diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/enums/ResultStatusEnum.java b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/enums/ResultStatusEnum.java new file mode 100644 index 0000000000..53a242380c --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/enums/ResultStatusEnum.java @@ -0,0 +1,30 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.catalog.enums; + +/** + * Simple Status Enum + * @author ms172g + * + */ +public enum ResultStatusEnum { + SUCCESS, FAIL , SERVICE_DISABLED +}
\ No newline at end of file diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/impl/ComponentMessage.java b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/impl/ComponentMessage.java new file mode 100644 index 0000000000..82b646714e --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/impl/ComponentMessage.java @@ -0,0 +1,151 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.catalog.impl; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.openecomp.sdc.be.catalog.api.IComponentMessage; +import org.openecomp.sdc.be.catalog.enums.ChangeTypeEnum; +import org.openecomp.sdc.be.datatypes.enums.ComponentTypeEnum; +import org.openecomp.sdc.be.model.CatalogUpdateTimestamp; +import org.openecomp.sdc.be.model.Component; +import org.openecomp.sdc.be.model.Resource; +import org.openecomp.sdc.be.model.Service; +import org.openecomp.sdc.be.model.catalog.CatalogComponent; +import org.openecomp.sdc.be.model.category.CategoryDefinition; +import org.openecomp.sdc.be.model.category.SubCategoryDefinition; + +import java.util.List; + +public class ComponentMessage extends CatalogComponent implements IComponentMessage { + /** + * + */ + private static final long serialVersionUID = 3233307722573636520L; + @JsonProperty("changeTypeEnum") + ChangeTypeEnum changeTypeEnum; + @JsonProperty("catalogUpdateTimestamp") + private CatalogUpdateTimestamp catalogUpdateTimestamp; + private Boolean isArchived; + + public ComponentMessage(Component component, ChangeTypeEnum changeTypeEnum, + CatalogUpdateTimestamp catalogUpdateTimestamp) { + super(); + + this.changeTypeEnum = changeTypeEnum; + this.catalogUpdateTimestamp = catalogUpdateTimestamp; + + setUniqueId(component.getUniqueId());// uniqueId + setUuid(component.getUUID()); // uuid + setInvariantUUID(component.getInvariantUUID()); // invariantUUID + + // View Fields + setName(component.getName()); // name + setSystemName(component.getSystemName()); // systemName + + setVersion(component.getVersion());// version + setLifecycleState(component.getLifecycleState() + .name()); // lifecycleState + setIcon(component.getIcon()); // icon + + ComponentTypeEnum componentType = component.getComponentType(); + setComponentType(componentType);// componentType + + buildCategories(component.getCategories()); // categoryNormalizedName, + // subCategoryNormalizedName + if (componentType == ComponentTypeEnum.SERVICE) { + Service service = (Service) component; + setDistributionStatus(service.getDistributionStatus() + .name()); // distributionStatus + } else { + Resource r = (Resource) component; + this.setResourceType(r.getResourceType() + .name()); // resourceType + } + setIsArchived(component.isArchived()); // isArchived + setIsHighestVersion(component.isHighestVersion()); // isHighestVersion + setDescription(component.getDescription()); // description + if (component.getTags() != null) { + setTags(component.getTags()); // tags + } + setLastUpdateDate(component.getLastUpdateDate());// lastUpdateDate + setLastUpdaterUserId(component.getLastUpdaterUserId()); + } + + private void buildCategories(List<CategoryDefinition> categories) { + if (categories != null) { + setCategories(categories); + CategoryDefinition categoryDefinition = categories.get(0); + + if (categoryDefinition != null) { + setCategoryNormalizedName(categoryDefinition.getName()); + List<SubCategoryDefinition> subcategories = categoryDefinition.getSubcategories(); + if (null != subcategories) { + SubCategoryDefinition subCategoryDefinition = subcategories.get(0); + + if (subCategoryDefinition != null) { + setSubCategoryNormalizedName(subCategoryDefinition.getName()); + } + } + } + } + } + + @Override + public ChangeTypeEnum getChangeType() { + return changeTypeEnum; + } + + @Override + public CatalogUpdateTimestamp getCatalogUpdateTimestamp() { + return catalogUpdateTimestamp; + } + + @Override + public String toString() { + return "ComponentMessage [ getChangeType()=" + getChangeType() + ", getCatalogUpdateTimestamp()=" + + getCatalogUpdateTimestamp() + ", getIsArchived()=" + getIsArchived() + ", getUuid()=" + getUuid() + + ", getInvariantUUID()=" + getInvariantUUID() + ", getSystemName()=" + getSystemName() + + ", getDescription()=" + getDescription() + ", getIsHighestVersion()=" + getIsHighestVersion() + + ", getCategoryNormalizedName()=" + getCategoryNormalizedName() + ", getSubCategoryNormalizedName()=" + + getSubCategoryNormalizedName() + ", getResourceType()=" + getResourceType() + ", getName()=" + + getName() + ", getLastUpdateDate()=" + getLastUpdateDate() + ", getVersion()=" + getVersion() + + ", getComponentType()=" + getComponentType() + ", getIcon()=" + getIcon() + ", getUniqueId()=" + + getUniqueId() + ", getLifecycleState()=" + getLifecycleState() + ", getDistributionStatus()=" + + getDistributionStatus() + ", getTags()=" + getTags() + ", getCategories()=" + getCategories() + + ", getClass()=" + getClass() + ", hashCode()=" + hashCode() + ", toString()=" + super.toString() + + "]"; + } + + public Boolean getIsArchived() { + return isArchived; + } + + public void setIsArchived(Boolean isArchived) { + this.isArchived = isArchived; + } + + + @Override + public String getMessageType() { + return getClass().getSimpleName(); + } + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/impl/DmaapProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/impl/DmaapProducer.java new file mode 100644 index 0000000000..40bea7b79c --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/impl/DmaapProducer.java @@ -0,0 +1,140 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.catalog.impl; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.openecomp.sdc.be.catalog.api.IMessageQueueHandlerProducer; +import org.openecomp.sdc.be.catalog.api.IStatus; +import org.openecomp.sdc.be.catalog.api.ITypeMessage; +import org.openecomp.sdc.be.catalog.enums.ResultStatusEnum; +import org.openecomp.sdc.be.components.distribution.engine.DmaapClientFactory; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.config.DmaapProducerConfiguration; +import org.openecomp.sdc.common.log.enums.StatusCode; +import org.openecomp.sdc.common.log.wrappers.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +@Component +public class DmaapProducer implements IMessageQueueHandlerProducer { + private static final Logger LOG = Logger.getLogger(DmaapProducer.class.getName()); + private static final Logger metricLog = Logger.getLogger(DmaapProducer.class.getName()); + + @Autowired + private DmaapClientFactory dmaapClientFactory; + private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager(); + private MRBatchingPublisher publisher; + @Autowired + private DmaapProducerHealth dmaapHealth; + + public MRBatchingPublisher getPublisher() { + return publisher; + } + + @Override + public IStatus pushMessage(ITypeMessage message) { + try { + DmaapProducerConfiguration producerConfiguration = configurationManager.getConfiguration() + .getDmaapProducerConfiguration(); + if (!producerConfiguration.getActive()) { + LOG.info( + "[Microservice DMAAP] producer is disabled [re-enable in configuration->isActive],message not sent."); + dmaapHealth.report(false); + return IStatus.getServiceDisabled(); + } + if (publisher == null) { + IStatus initStatus = init(); + if (initStatus.getResultStatus() != ResultStatusEnum.SUCCESS) { + + return initStatus; + } + } + ObjectMapper mapper = new ObjectMapper(); + String jsonInString = mapper.writeValueAsString(message); + if (publisher != null) { + LOG.info("before send message . response {}", jsonInString); + + LOG.invoke("Dmaap Producer", "DmaapProducer-pushMessage", DmaapProducer.class.getName(), message.toString()); + + int pendingMsg = publisher.send(jsonInString); + LOG.info("sent message . response {}", pendingMsg); + LOG.invokeReturn(producerConfiguration.getConsumerId(), "Dmaap Producer", StatusCode.COMPLETE.getStatusCodeEnum(), "DmaapProducer-pushMessage",message.toString(), pendingMsg ); + + } + + + + dmaapHealth.report(true); + } catch (Exception e) { + LOG.error("Failed to send message . Exception {}", e.getMessage()); + return IStatus.getFailStatus(); + } + + return IStatus.getSuccessStatus(); + } + + @PostConstruct + @Override + public IStatus init() { + LOG.debug("MessageQueueHandlerProducer:: Start initializing"); + DmaapProducerConfiguration configuration = configurationManager.getConfiguration() + .getDmaapProducerConfiguration(); + if (configuration.getActive()) { + try { + publisher = dmaapClientFactory.createProducer(configuration); + if (publisher == null) { + LOG.error("Failed to connect to topic "); + dmaapHealth.report(false); + return IStatus.getFailStatus(); + } + + } catch (Exception e) { + LOG.error("Failed to connect to topic . Exeption {}", e.getMessage()); + dmaapHealth.report(false); + return IStatus.getFailStatus(); + } + dmaapHealth.report(true); + return IStatus.getSuccessStatus(); + } + LOG.info("[Microservice DMAAP] producer is disabled [re-enable in configuration->isActive],message not sent."); + dmaapHealth.report(false); + return IStatus.getServiceDisabled(); + } + + @PreDestroy + public void shutdown() { + LOG.debug("DmaapProducer::shutdown..."); + try { + if (publisher != null) { + publisher.close(); + } + } catch (Exception e) { + LOG.error("Failed to close messageQ . Exeption {}", e.getMessage()); + + } + + } + +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/impl/DmaapProducerHealth.java b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/impl/DmaapProducerHealth.java new file mode 100644 index 0000000000..b62df86b4b --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/catalog/impl/DmaapProducerHealth.java @@ -0,0 +1,194 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.catalog.impl; + +import org.openecomp.sdc.be.config.BeEcompErrorManager; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.config.DmaapProducerConfiguration; +import org.openecomp.sdc.common.api.Constants; +import org.openecomp.sdc.common.api.HealthCheckInfo; +import org.openecomp.sdc.common.log.wrappers.Logger; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +@Component("dmaapProducerHealth") +public class DmaapProducerHealth { + + + private static final String DMAAP_HEALTH_LOG_CONTEXT = "dmaapProducer.healthcheck"; + private static final String DMAAP_HEALTH_CHECK_STR = "dmaapProducerHealthCheck"; + private static final Logger log = Logger.getLogger(DmaapProducerHealth.class.getName()); + private static final Logger logHealth = Logger.getLogger(DMAAP_HEALTH_LOG_CONTEXT); + private HealthCheckInfo healthCheckInfo = DmaapProducerHealth.HealthCheckInfoResult.UNAVAILABLE.getHealthCheckInfo(); + private long healthCheckReadTimeout = 20; + private long reconnectInterval = 5; + private HealthCheckScheduledTask healthCheckScheduledTask = null ; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private ScheduledFuture<?> scheduledFuture = null; + private DmaapProducerConfiguration configuration = null ; + + private volatile AtomicBoolean lastHealthState = new AtomicBoolean(false); + private volatile AtomicBoolean reportedHealthState = null; + + public enum HealthCheckInfoResult { + OK(new HealthCheckInfo(Constants.HC_COMPONENT_DMAAP_PRODUCER, HealthCheckInfo.HealthCheckStatus.UP, null, DmaapStatusDescription.OK.getDescription())), + UNAVAILABLE(new HealthCheckInfo(Constants.HC_COMPONENT_DMAAP_PRODUCER, HealthCheckInfo.HealthCheckStatus.DOWN, null, DmaapStatusDescription.UNAVAILABLE.getDescription())), + DOWN(new HealthCheckInfo(Constants.HC_COMPONENT_DMAAP_PRODUCER, HealthCheckInfo.HealthCheckStatus.DOWN, null, DmaapStatusDescription.DOWN.getDescription())); + + private HealthCheckInfo healthCheckInfo; + HealthCheckInfoResult(HealthCheckInfo healthCheckInfo) { + this.healthCheckInfo = healthCheckInfo; + } + public HealthCheckInfo getHealthCheckInfo() { + return healthCheckInfo; + } + } + + public enum DmaapStatusDescription { + OK("OK"), UNAVAILABLE("DmaapProducer is not available"),DOWN("DOWN"), NOT_CONFIGURED("DmaapProducer configuration is missing/wrong "); + + private String desc; + DmaapStatusDescription(String desc) { + this.desc = desc; + } + public String getDescription() { + return desc; + } + + } + + @PostConstruct + public DmaapProducerHealth init() { + log.trace("Enter init method of DmaapProducer health"); + synchronized (DmaapProducerHealth.class){ + this.configuration = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapProducerConfiguration(); + + Integer pollingInterval = configuration.getPollingInterval(); + if (pollingInterval != null && pollingInterval!=0) { + reconnectInterval = pollingInterval; + } + Integer healthCheckReadTimeoutConfig = configuration.getTimeoutMs(); + if (healthCheckReadTimeoutConfig != null) { + this.healthCheckReadTimeout = healthCheckReadTimeoutConfig; + } + this.healthCheckScheduledTask = new HealthCheckScheduledTask( configuration ); //what is the representation? csv? delimiter? json or other + startHealthCheckTask(true); + } + log.trace("Exit init method of DistributionEngineClusterHealth"); + return this; + } + + @PreDestroy + protected void destroy() { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + scheduledFuture = null; + } + if (scheduler != null) { + scheduler.shutdown(); + } + } + + /** + * Start health check task. + * + * @param startTask + */ + private void startHealthCheckTask(boolean startTask) { + synchronized (DmaapProducerHealth.class){ + if (startTask && this.scheduledFuture == null) { + this.scheduledFuture = this.scheduler.scheduleAtFixedRate(this.healthCheckScheduledTask , 0, reconnectInterval, TimeUnit.SECONDS); + } + } + } + + void report(Boolean isUp){ + if (reportedHealthState == null) + reportedHealthState = new AtomicBoolean(isUp); + reportedHealthState.set(isUp); + } + + + public HealthCheckInfo getHealthCheckInfo() { + return healthCheckInfo; + } + + /** + * Health Check Task Scheduler - infinite check. + */ + public class HealthCheckScheduledTask implements Runnable { + private final DmaapProducerConfiguration config; + private static final int TIMEOUT = 8192; + + HealthCheckScheduledTask(final DmaapProducerConfiguration config){ + this.config = config; + } + @Override + public void run() { + logHealth.trace("Executing Dmaap Health Check Task - Start"); + boolean prevIsReachable; + boolean reachable; + //first try simple ping + try{ + if ( reportedHealthState != null ){ + reachable = reportedHealthState.get(); + } + else{ + reachable = false; + } + prevIsReachable = lastHealthState.getAndSet( reachable ); + healthCheckInfo = reachable ? HealthCheckInfoResult.OK.healthCheckInfo : HealthCheckInfoResult.DOWN.healthCheckInfo; + } + catch( Exception e ){ + log.debug("{} - cannot check connectivity -> {}",DMAAP_HEALTH_CHECK_STR, e ); + prevIsReachable = lastHealthState.getAndSet(false); + healthCheckInfo = HealthCheckInfoResult.UNAVAILABLE.healthCheckInfo; + } + if (prevIsReachable != lastHealthState.get()) + logAlarm( lastHealthState.get() ); + } + + + + + private void logAlarm(boolean lastHealthState) { + try{ + if ( lastHealthState ) { + BeEcompErrorManager.getInstance().logDmaapHealthCheckRecovery( DMAAP_HEALTH_CHECK_STR ); + } else { + BeEcompErrorManager.getInstance().logDmaapHealthCheckError( DMAAP_HEALTH_CHECK_STR ); + } + }catch( Exception e ){ + log.debug("cannot logAlarm -> {}" ,e ); + } + } + + } + + +} |