aboutsummaryrefslogtreecommitdiffstats
path: root/participant
diff options
context:
space:
mode:
authorFrancescoFioraEst <francesco.fiora@est.tech>2024-11-08 13:36:15 +0000
committerFrancescoFioraEst <francesco.fiora@est.tech>2025-01-13 16:43:25 +0000
commita57452cf03010acee39542550db00d428a300863 (patch)
tree28e18982479347da1ebdb228dba4b30defc76e44 /participant
parent11a7fa10d470b3f6b4bcc0e917bbeb9a58019153 (diff)
Add Kafka Health Check in ACM
Issue-ID: POLICY-5203 Change-Id: Id1705ac74d53cd5f2f8c3d81aae2366baf774fcf Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'participant')
-rw-r--r--participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java140
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java75
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java6
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarterTest.java113
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java12
6 files changed, 285 insertions, 67 deletions
diff --git a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
index 3f4d79472..d87219bd6 100644
--- a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
@@ -20,6 +20,12 @@ participant:
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c90
+ topicValidation: true
+ clampAdminTopics:
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
clampAutomationCompositionTopics:
topicSources:
- topic: ${participant.intermediaryParameters.topics.operationTopic}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java
new file mode 100644
index 000000000..1c862e925
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarter.java
@@ -0,0 +1,140 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheckFactory;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+@Component
+public class BrokerStarter<T> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BrokerStarter.class);
+ private final IntermediaryActivator activator;
+ private final ParticipantHandler participantHandler;
+ private final TopicHealthCheck topicHealthCheck;
+
+ private final ParticipantParameters parameters;
+ private final List<Publisher> publishers;
+ private final List<Listener<T>> listeners;
+
+ /**
+ * Constructor.
+ *
+ * @param parameters ParticipantParameters
+ * @param activator IntermediaryActivator
+ * @param participantHandler participantHandler
+ */
+ public BrokerStarter(ParticipantParameters parameters,
+ List<Publisher> publishers, List<Listener<T>> listeners, IntermediaryActivator activator,
+ ParticipantHandler participantHandler) {
+ this.parameters = parameters;
+ this.listeners = listeners;
+ this.publishers = publishers;
+ this.activator = activator;
+ this.participantHandler = participantHandler;
+ var topic = parameters.getIntermediaryParameters().getClampAdminTopics();
+ if (topic == null) {
+ topic = new TopicParameters();
+ topic.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name());
+ }
+ this.topicHealthCheck = createTopicHealthCheck(topic);
+ }
+
+ protected TopicHealthCheck createTopicHealthCheck(TopicParameters topic) {
+ return new TopicHealthCheckFactory().getTopicHealthCheck(topic);
+ }
+
+ /**
+ * Handle ContextRefreshEvent.
+ *
+ * @param ctxRefreshedEvent ContextRefreshedEvent
+ */
+ @EventListener
+ public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
+ if (!activator.isAlive()) {
+ runTopicHealthCheck();
+ start();
+ }
+ }
+
+ private void runTopicHealthCheck() {
+ var fetchTimeout = getFetchTimeout();
+ while (!topicHealthCheck.healthCheck(getTopics())) {
+ LOGGER.debug(" Broker not up yet!");
+ try {
+ Thread.sleep(fetchTimeout);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private List<String> getTopics() {
+ var opTopic = parameters.getIntermediaryParameters().getTopics().getOperationTopic();
+ var syncTopic = parameters.getIntermediaryParameters().getTopics().getSyncTopic();
+ return Boolean.TRUE.equals(parameters.getIntermediaryParameters().getTopicValidation())
+ ? List.of(opTopic, syncTopic) : List.<String>of();
+ }
+
+ private int getFetchTimeout() {
+ int fetchTimeout = parameters.getIntermediaryParameters().getClampAdminTopics() == null
+ ? 0 : parameters.getIntermediaryParameters().getClampAdminTopics().getFetchTimeout();
+ return Math.max(fetchTimeout, 5000);
+ }
+
+ private void start() {
+ activator.config(parameters, publishers, listeners);
+ activator.start();
+ var task = new TimerTask() {
+ @Override
+ public void run() {
+ new Thread(participantHandler::sendParticipantRegister).start();
+ }
+ };
+ new Timer().schedule(task, 5000);
+ }
+
+
+ /**
+ * Handle ContextClosedEvent.
+ *
+ * @param ctxClosedEvent ContextClosedEvent
+ */
+ @EventListener
+ public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
+ if (activator.isAlive()) {
+ participantHandler.sendParticipantDeregister();
+ activator.stop();
+ }
+ }
+}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
index 388603099..cb8df0a31 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021,2024 Nordix Foundation.
+ * Copyright (C) 2021,2024-2025 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -23,9 +23,8 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler;
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
import lombok.Getter;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
@@ -34,9 +33,6 @@ import org.onap.policy.common.message.bus.event.TopicEndpointManager;
import org.onap.policy.common.message.bus.event.TopicSink;
import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
-import org.springframework.context.event.ContextClosedEvent;
-import org.springframework.context.event.ContextRefreshedEvent;
-import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
@@ -48,10 +44,8 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
private static final String[] MSG_TYPE_NAMES = {"messageType"};
// Topics from which the participant receives and to which the participant sends messages
- private final List<TopicSink> topicSinks;
- private final List<TopicSource> topicSources;
-
- private final ParticipantHandler participantHandler;
+ private final List<TopicSink> topicSinks = new ArrayList<>();
+ private final List<TopicSource> topicSources = new ArrayList<>();
@Getter
private final MessageTypeDispatcher msgDispatcher;
@@ -60,26 +54,29 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
private final MessageTypeDispatcher syncMsgDispatcher;
/**
+ * Constructor.
+ */
+ public IntermediaryActivator() {
+ msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ }
+
+ /**
* Instantiate the activator for participant.
*
* @param parameters the ParticipantParameters
- * @param participantHandler the ParticipantHandler
* @param publishers list of Publishers
* @param listeners list of Listeners
*/
- public <T> IntermediaryActivator(final ParticipantParameters parameters, ParticipantHandler participantHandler,
- List<Publisher> publishers, List<Listener<T>> listeners) {
- this.participantHandler = participantHandler;
+ public <T> void config(ParticipantParameters parameters,
+ List<Publisher> publishers, List<Listener<T>> listeners) {
- topicSinks = TopicEndpointManager.getManager().addTopicSinks(
- parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSinks());
+ topicSinks.addAll(TopicEndpointManager.getManager().addTopicSinks(
+ parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSinks()));
- topicSources = TopicEndpointManager.getManager().addTopicSources(
- parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSources());
+ topicSources.addAll(TopicEndpointManager.getManager().addTopicSources(
+ parameters.getIntermediaryParameters().getClampAutomationCompositionTopics().getTopicSources()));
- msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
-
- syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
// @formatter:off
addAction("Topic endpoint management",
@@ -109,42 +106,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
}
/**
- * Handle ContextRefreshEvent.
- *
- * @param ctxRefreshedEvent ContextRefreshedEvent
- */
- @EventListener
- public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
- if (!isAlive()) {
- start();
- var task = new TimerTask() {
- @Override
- public void run() {
- new Thread(participantHandler::sendParticipantRegister).start();
- }
- };
- new Timer().schedule(task, 5000);
- }
- }
-
- /**
- * Handle ContextClosedEvent.
- *
- * @param ctxClosedEvent ContextClosedEvent
- */
- @EventListener
- public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
- if (isAlive()) {
- sendParticipantDeregister();
- stop();
- }
- }
-
- private void sendParticipantDeregister() {
- participantHandler.sendParticipantDeregister();
- }
-
- /**
* Registers the dispatcher with the topic source(s).
*/
private void registerMsgDispatcher(Topics topics) {
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
index dad9c8ae7..d94dc5d39 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2024 Nordix Foundation.
+ * Copyright (C) 2021-2024-2025 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@ import lombok.Getter;
import lombok.Setter;
import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
import org.onap.policy.common.parameters.topic.TopicParameterGroup;
+import org.onap.policy.common.parameters.topic.TopicParameters;
import org.onap.policy.common.parameters.validation.ParameterGroupConstraint;
/**
@@ -64,4 +65,7 @@ public class ParticipantIntermediaryParameters {
@Valid
private Topics topics = new Topics();
+ private Boolean topicValidation = false;
+
+ private TopicParameters clampAdminTopics;
}
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarterTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarterTest.java
new file mode 100644
index 000000000..093ae9e90
--- /dev/null
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/BrokerStarterTest.java
@@ -0,0 +1,113 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantStatusReqListener;
+import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+
+class BrokerStarterTest {
+
+ @Test
+ void testWithClampAdminTopicsNull() {
+ var parameters = CommonTestData.getParticipantParameters();
+ parameters.getIntermediaryParameters().setClampAdminTopics(null);
+ var publishers = List.of(mock(Publisher.class));
+ var listeners = List.of(mock(ParticipantStatusReqListener.class));
+ var activator = mock(IntermediaryActivator.class);
+ var participantHandler = mock(ParticipantHandler.class);
+ var brokerStarter = new BrokerStarter(parameters, publishers, listeners, activator, participantHandler);
+ when(activator.isAlive()).thenReturn(false);
+
+ brokerStarter.handleContextRefreshEvent(mock(ContextRefreshedEvent.class));
+ verify(activator).start();
+
+ brokerStarter.handleContextClosedEvent(mock(ContextClosedEvent.class));
+ verify(participantHandler, times(0)).sendParticipantDeregister();
+ verify(activator, times(0)).stop();
+ }
+
+ @Test
+ void testAlreadyAlive() {
+ var parameters = CommonTestData.getParticipantParameters();
+ var topic = new TopicParameters();
+ topic.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name());
+ parameters.getIntermediaryParameters().setClampAdminTopics(topic);
+ var publishers = List.of(mock(Publisher.class));
+ var listeners = List.of(mock(ParticipantStatusReqListener.class));
+ var activator = mock(IntermediaryActivator.class);
+ var participantHandler = mock(ParticipantHandler.class);
+ var brokerStarter = new BrokerStarter(parameters, publishers, listeners, activator, participantHandler);
+
+ when(activator.isAlive()).thenReturn(true);
+ brokerStarter.handleContextRefreshEvent(mock(ContextRefreshedEvent.class));
+ verify(activator, times(0)).start();
+
+ brokerStarter.handleContextClosedEvent(mock(ContextClosedEvent.class));
+ verify(activator).stop();
+ verify(participantHandler).sendParticipantDeregister();
+ }
+
+ private static class DummyTopicHealthCheck implements TopicHealthCheck {
+
+ int count = 0;
+
+ // first call is false, next will be true
+ @Override
+ public boolean healthCheck(List<String> list) {
+ return (count++) > 0;
+ }
+ }
+
+ @Test
+ void testWithClampAdminTopics() {
+ var parameters = CommonTestData.getParticipantParameters();
+ var topic = new TopicParameters();
+ topic.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name());
+ parameters.getIntermediaryParameters().setClampAdminTopics(topic);
+ var publishers = List.of(mock(Publisher.class));
+ var listeners = List.of(mock(ParticipantStatusReqListener.class));
+ var activator = mock(IntermediaryActivator.class);
+ var participantHandler = mock(ParticipantHandler.class);
+ var topicHealthCheck = new DummyTopicHealthCheck();
+ var brokerStarter = new BrokerStarter(parameters, publishers, listeners, activator, participantHandler) {
+ @Override
+ protected TopicHealthCheck createTopicHealthCheck(TopicParameters topic) {
+ return topicHealthCheck;
+ }
+ };
+
+ when(activator.isAlive()).thenReturn(false);
+ brokerStarter.handleContextRefreshEvent(mock(ContextRefreshedEvent.class));
+ verify(activator).start();
+ }
+}
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java
index 89bafa196..016f46c49 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2024 Nordix Foundation.
+ * Copyright (C) 2021-2024-2025 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
package org.onap.policy.clamp.acm.participant.intermediary.handler;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -39,8 +38,6 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSt
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.springframework.context.event.ContextClosedEvent;
-import org.springframework.context.event.ContextRefreshedEvent;
class IntermediaryActivatorTest {
private static final Coder CODER = new StandardCoder();
@@ -68,8 +65,8 @@ class IntermediaryActivatorTest {
List<Listener<ParticipantStatusReq>> listeners = List.of(listenerFirst, listenerSecond);
- var handler = mock(ParticipantHandler.class);
- try (var activator = new IntermediaryActivator(parameters, handler, publishers, listeners)) {
+ try (var activator = new IntermediaryActivator()) {
+ activator.config(parameters, publishers, listeners);
assertFalse(activator.isAlive());
activator.start();
@@ -95,9 +92,6 @@ class IntermediaryActivatorTest {
// repeat stop - should throw an exception
assertThatIllegalStateException().isThrownBy(activator::stop);
assertFalse(activator.isAlive());
-
- assertDoesNotThrow(() -> activator.handleContextRefreshEvent(mock(ContextRefreshedEvent.class)));
- assertDoesNotThrow(() -> activator.handleContextClosedEvent(mock(ContextClosedEvent.class)));
}
}
}