From a57452cf03010acee39542550db00d428a300863 Mon Sep 17 00:00:00 2001 From: FrancescoFioraEst Date: Fri, 8 Nov 2024 13:36:15 +0000 Subject: Add Kafka Health Check in ACM Issue-ID: POLICY-5203 Change-Id: Id1705ac74d53cd5f2f8c3d81aae2366baf774fcf Signed-off-by: FrancescoFioraEst --- .../src/main/resources/config/application.yaml | 6 + .../intermediary/handler/BrokerStarter.java | 140 +++++++++++++++++++++ .../handler/IntermediaryActivator.java | 75 +++-------- .../ParticipantIntermediaryParameters.java | 6 +- .../intermediary/handler/BrokerStarterTest.java | 113 +++++++++++++++++ .../handler/IntermediaryActivatorTest.java | 12 +- 6 files changed, 285 insertions(+), 67 deletions(-) create mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java create mode 100644 participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarterTest.java (limited to 'participant') diff --git a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml index 3f4d79472..d87219bd6 100644 --- a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml +++ b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml @@ -20,6 +20,12 @@ participant: reportingTimeIntervalMs: 120000 description: Participant Description participantId: 101c62b3-8918-41b9-a747-d21eb79c6c90 + topicValidation: true + clampAdminTopics: + servers: + - ${topicServer:kafka:9092} + topicCommInfrastructure: NOOP + fetchTimeout: 15000 clampAutomationCompositionTopics: topicSources: - topic: ${participant.intermediaryParameters.topics.operationTopic} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java new file mode 100644 index 000000000..1c862e925 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java @@ -0,0 +1,140 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2025 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.participant.intermediary.handler; + +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters; +import org.onap.policy.common.message.bus.event.Topic; +import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck; +import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheckFactory; +import org.onap.policy.common.parameters.topic.TopicParameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +@Component +public class BrokerStarter { + private static final Logger LOGGER = LoggerFactory.getLogger(BrokerStarter.class); + private final IntermediaryActivator activator; + private final ParticipantHandler participantHandler; + private final TopicHealthCheck topicHealthCheck; + + private final ParticipantParameters parameters; + private final List publishers; + private final List> listeners; + + /** + * Constructor. + * + * @param parameters ParticipantParameters + * @param activator IntermediaryActivator + * @param participantHandler participantHandler + */ + public BrokerStarter(ParticipantParameters parameters, + List publishers, List> listeners, IntermediaryActivator activator, + ParticipantHandler participantHandler) { + this.parameters = parameters; + this.listeners = listeners; + this.publishers = publishers; + this.activator = activator; + this.participantHandler = participantHandler; + var topic = parameters.getIntermediaryParameters().getClampAdminTopics(); + if (topic == null) { + topic = new TopicParameters(); + topic.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name()); + } + this.topicHealthCheck = createTopicHealthCheck(topic); + } + + protected TopicHealthCheck createTopicHealthCheck(TopicParameters topic) { + return new TopicHealthCheckFactory().getTopicHealthCheck(topic); + } + + /** + * Handle ContextRefreshEvent. + * + * @param ctxRefreshedEvent ContextRefreshedEvent + */ + @EventListener + public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) { + if (!activator.isAlive()) { + runTopicHealthCheck(); + start(); + } + } + + private void runTopicHealthCheck() { + var fetchTimeout = getFetchTimeout(); + while (!topicHealthCheck.healthCheck(getTopics())) { + LOGGER.debug(" Broker not up yet!"); + try { + Thread.sleep(fetchTimeout); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage()); + Thread.currentThread().interrupt(); + } + } + } + + private List getTopics() { + var opTopic = parameters.getIntermediaryParameters().getTopics().getOperationTopic(); + var syncTopic = parameters.getIntermediaryParameters().getTopics().getSyncTopic(); + return Boolean.TRUE.equals(parameters.getIntermediaryParameters().getTopicValidation()) + ? List.of(opTopic, syncTopic) : List.of(); + } + + private int getFetchTimeout() { + int fetchTimeout = parameters.getIntermediaryParameters().getClampAdminTopics() == null + ? 0 : parameters.getIntermediaryParameters().getClampAdminTopics().getFetchTimeout(); + return Math.max(fetchTimeout, 5000); + } + + private void start() { + activator.config(parameters, publishers, listeners); + activator.start(); + var task = new TimerTask() { + @Override + public void run() { + new Thread(participantHandler::sendParticipantRegister).start(); + } + }; + new Timer().schedule(task, 5000); + } + + + /** + * Handle ContextClosedEvent. + * + * @param ctxClosedEvent ContextClosedEvent + */ + @EventListener + public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) { + if (activator.isAlive()) { + participantHandler.sendParticipantDeregister(); + activator.stop(); + } + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java index 388603099..cb8df0a31 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021,2024 Nordix Foundation. + * Copyright (C) 2021,2024-2025 Nordix Foundation. * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -23,9 +23,8 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.List; -import java.util.Timer; -import java.util.TimerTask; import lombok.Getter; import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics; @@ -34,9 +33,6 @@ import org.onap.policy.common.message.bus.event.TopicEndpointManager; import org.onap.policy.common.message.bus.event.TopicSink; import org.onap.policy.common.message.bus.event.TopicSource; import org.onap.policy.common.utils.services.ServiceManagerContainer; -import org.springframework.context.event.ContextClosedEvent; -import org.springframework.context.event.ContextRefreshedEvent; -import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; /** @@ -48,10 +44,8 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl private static final String[] MSG_TYPE_NAMES = {"messageType"}; // Topics from which the participant receives and to which the participant sends messages - private final List topicSinks; - private final List topicSources; - - private final ParticipantHandler participantHandler; + private final List topicSinks = new ArrayList<>(); + private final List topicSources = new ArrayList<>(); @Getter private final MessageTypeDispatcher msgDispatcher; @@ -59,27 +53,30 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl @Getter private final MessageTypeDispatcher syncMsgDispatcher; + /** + * Constructor. + */ + public IntermediaryActivator() { + msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + } + /** * Instantiate the activator for participant. * * @param parameters the ParticipantParameters - * @param participantHandler the ParticipantHandler * @param publishers list of Publishers * @param listeners list of Listeners */ - public IntermediaryActivator(final ParticipantParameters parameters, ParticipantHandler participantHandler, - List publishers, List> listeners) { - this.participantHandler = participantHandler; + public void config(ParticipantParameters parameters, + List publishers, List> listeners) { - topicSinks = TopicEndpointManager.getManager().addTopicSinks( - parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSinks()); + topicSinks.addAll(TopicEndpointManager.getManager().addTopicSinks( + parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSinks())); - topicSources = TopicEndpointManager.getManager().addTopicSources( - parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSources()); + topicSources.addAll(TopicEndpointManager.getManager().addTopicSources( + parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSources())); - msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - - syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); // @formatter:off addAction("Topic endpoint management", @@ -108,42 +105,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl // @formatter:on } - /** - * Handle ContextRefreshEvent. - * - * @param ctxRefreshedEvent ContextRefreshedEvent - */ - @EventListener - public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) { - if (!isAlive()) { - start(); - var task = new TimerTask() { - @Override - public void run() { - new Thread(participantHandler::sendParticipantRegister).start(); - } - }; - new Timer().schedule(task, 5000); - } - } - - /** - * Handle ContextClosedEvent. - * - * @param ctxClosedEvent ContextClosedEvent - */ - @EventListener - public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) { - if (isAlive()) { - sendParticipantDeregister(); - stop(); - } - } - - private void sendParticipantDeregister() { - participantHandler.sendParticipantDeregister(); - } - /** * Registers the dispatcher with the topic source(s). */ diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java index dad9c8ae7..d94dc5d39 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021-2024 Nordix Foundation. + * Copyright (C) 2021-2024-2025 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import lombok.Getter; import lombok.Setter; import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType; import org.onap.policy.common.parameters.topic.TopicParameterGroup; +import org.onap.policy.common.parameters.topic.TopicParameters; import org.onap.policy.common.parameters.validation.ParameterGroupConstraint; /** @@ -64,4 +65,7 @@ public class ParticipantIntermediaryParameters { @Valid private Topics topics = new Topics(); + private Boolean topicValidation = false; + + private TopicParameters clampAdminTopics; } diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarterTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarterTest.java new file mode 100644 index 000000000..093ae9e90 --- /dev/null +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarterTest.java @@ -0,0 +1,113 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2025 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.participant.intermediary.handler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import org.junit.jupiter.api.Test; +import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantStatusReqListener; +import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData; +import org.onap.policy.common.message.bus.event.Topic; +import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck; +import org.onap.policy.common.parameters.topic.TopicParameters; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.ContextRefreshedEvent; + +class BrokerStarterTest { + + @Test + void testWithClampAdminTopicsNull() { + var parameters = CommonTestData.getParticipantParameters(); + parameters.getIntermediaryParameters().setClampAdminTopics(null); + var publishers = List.of(mock(Publisher.class)); + var listeners = List.of(mock(ParticipantStatusReqListener.class)); + var activator = mock(IntermediaryActivator.class); + var participantHandler = mock(ParticipantHandler.class); + var brokerStarter = new BrokerStarter(parameters, publishers, listeners, activator, participantHandler); + when(activator.isAlive()).thenReturn(false); + + brokerStarter.handleContextRefreshEvent(mock(ContextRefreshedEvent.class)); + verify(activator).start(); + + brokerStarter.handleContextClosedEvent(mock(ContextClosedEvent.class)); + verify(participantHandler, times(0)).sendParticipantDeregister(); + verify(activator, times(0)).stop(); + } + + @Test + void testAlreadyAlive() { + var parameters = CommonTestData.getParticipantParameters(); + var topic = new TopicParameters(); + topic.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name()); + parameters.getIntermediaryParameters().setClampAdminTopics(topic); + var publishers = List.of(mock(Publisher.class)); + var listeners = List.of(mock(ParticipantStatusReqListener.class)); + var activator = mock(IntermediaryActivator.class); + var participantHandler = mock(ParticipantHandler.class); + var brokerStarter = new BrokerStarter(parameters, publishers, listeners, activator, participantHandler); + + when(activator.isAlive()).thenReturn(true); + brokerStarter.handleContextRefreshEvent(mock(ContextRefreshedEvent.class)); + verify(activator, times(0)).start(); + + brokerStarter.handleContextClosedEvent(mock(ContextClosedEvent.class)); + verify(activator).stop(); + verify(participantHandler).sendParticipantDeregister(); + } + + private static class DummyTopicHealthCheck implements TopicHealthCheck { + + int count = 0; + + // first call is false, next will be true + @Override + public boolean healthCheck(List list) { + return (count++) > 0; + } + } + + @Test + void testWithClampAdminTopics() { + var parameters = CommonTestData.getParticipantParameters(); + var topic = new TopicParameters(); + topic.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name()); + parameters.getIntermediaryParameters().setClampAdminTopics(topic); + var publishers = List.of(mock(Publisher.class)); + var listeners = List.of(mock(ParticipantStatusReqListener.class)); + var activator = mock(IntermediaryActivator.class); + var participantHandler = mock(ParticipantHandler.class); + var topicHealthCheck = new DummyTopicHealthCheck(); + var brokerStarter = new BrokerStarter(parameters, publishers, listeners, activator, participantHandler) { + @Override + protected TopicHealthCheck createTopicHealthCheck(TopicParameters topic) { + return topicHealthCheck; + } + }; + + when(activator.isAlive()).thenReturn(false); + brokerStarter.handleContextRefreshEvent(mock(ContextRefreshedEvent.class)); + verify(activator).start(); + } +} diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java index 89bafa196..016f46c49 100644 --- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021-2024 Nordix Foundation. + * Copyright (C) 2021-2024-2025 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -39,8 +38,6 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSt import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.common.utils.coder.StandardCoderObject; -import org.springframework.context.event.ContextClosedEvent; -import org.springframework.context.event.ContextRefreshedEvent; class IntermediaryActivatorTest { private static final Coder CODER = new StandardCoder(); @@ -68,8 +65,8 @@ class IntermediaryActivatorTest { List> listeners = List.of(listenerFirst, listenerSecond); - var handler = mock(ParticipantHandler.class); - try (var activator = new IntermediaryActivator(parameters, handler, publishers, listeners)) { + try (var activator = new IntermediaryActivator()) { + activator.config(parameters, publishers, listeners); assertFalse(activator.isAlive()); activator.start(); @@ -95,9 +92,6 @@ class IntermediaryActivatorTest { // repeat stop - should throw an exception assertThatIllegalStateException().isThrownBy(activator::stop); assertFalse(activator.isAlive()); - - assertDoesNotThrow(() -> activator.handleContextRefreshEvent(mock(ContextRefreshedEvent.class))); - assertDoesNotThrow(() -> activator.handleContextClosedEvent(mock(ContextClosedEvent.class))); } } } -- cgit 1.2.3-korg