aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheck.java25
-rw-r--r--message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactory.java41
-rw-r--r--message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheck.java109
-rw-r--r--message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheck.java30
-rw-r--r--message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactoryTest.java44
-rw-r--r--message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheckTest.java122
-rw-r--r--message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheckTest.java34
7 files changed, 405 insertions, 0 deletions
diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheck.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheck.java
new file mode 100644
index 00000000..f371eda9
--- /dev/null
+++ b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheck.java
@@ -0,0 +1,25 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck;
+
+import java.util.List;
+
+public interface TopicHealthCheck {
+ boolean healthCheck(List<String> topics);
+}
diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactory.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactory.java
new file mode 100644
index 00000000..d7790504
--- /dev/null
+++ b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactory.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck;
+
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.healthcheck.kafka.KafkaHealthCheck;
+import org.onap.policy.common.message.bus.healthcheck.noop.NoopHealthCheck;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+
+public class TopicHealthCheckFactory {
+
+ /**
+ * Get Topic HealthCheck.
+ *
+ * @param param TopicParameters
+ * @return TopicHealthCheck
+ */
+ public TopicHealthCheck getTopicHealthCheck(TopicParameters param) {
+ return switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) {
+ case KAFKA -> new KafkaHealthCheck(param);
+ case NOOP -> new NoopHealthCheck();
+ default -> null;
+ };
+ }
+}
diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheck.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheck.java
new file mode 100644
index 00000000..ef8a0f76
--- /dev/null
+++ b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheck.java
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.KafkaException;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaHealthCheck implements TopicHealthCheck {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class);
+ private final TopicParameters parameters;
+
+ public KafkaHealthCheck(TopicParameters parameters) {
+ this.parameters = parameters;
+ }
+
+ /**
+ * Check that Kafka is OnLine and topics are available.
+ *
+ * @return true if Kafka is OnLine
+ */
+ public boolean healthCheck(List<String> topics) {
+ if (parameters.getServers() == null || parameters.getServers().isEmpty()) {
+ logger.warn("Kafka Address not defined!");
+ return true;
+ }
+ try (var client = createAdminClient()) {
+ if (!checkConnection(client)) {
+ logger.warn("Kafka not UP yet!");
+ return false;
+ }
+ if (topics.isEmpty()) {
+ logger.warn("Kafka is UP");
+ return true;
+ }
+
+ return checkTopics(client, topics);
+ } catch (KafkaException | ExecutionException e) {
+ logger.error(e.getMessage());
+ return false;
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage());
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ private boolean checkConnection(AdminClient client) throws ExecutionException, InterruptedException {
+ var nodes = client.describeCluster().nodes().get();
+ if (nodes == null || nodes.isEmpty()) {
+ return false;
+ }
+ nodes.forEach(node -> logger.debug("nodeId {}", node.id()));
+ return true;
+ }
+
+ private boolean checkTopics(AdminClient client, List<String> topics)
+ throws ExecutionException, InterruptedException {
+ var listTopics = client.listTopics().names().get();
+ if (listTopics == null || listTopics.isEmpty()) {
+ logger.warn("Kafka topics not available!");
+ return false;
+ }
+ var setTopics = listTopics.stream().map(String::toLowerCase).collect(Collectors.toSet());
+ for (var topic : topics) {
+ if (!setTopics.contains(topic.toLowerCase())) {
+ logger.warn("Kafka topic {} not available!", topic);
+ return false;
+ }
+ }
+ logger.info("Kafka is UP and topics available!");
+ return true;
+ }
+
+ protected AdminClient createAdminClient() {
+ var kafkaProps = new Properties();
+ kafkaProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, parameters.getServers().get(0));
+
+ if (parameters.isAdditionalPropsValid()) {
+ kafkaProps.putAll(parameters.getAdditionalProps());
+ }
+ return AdminClient.create(kafkaProps);
+ }
+}
diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheck.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheck.java
new file mode 100644
index 00000000..684f7f41
--- /dev/null
+++ b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheck.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck.noop;
+
+import java.util.List;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck;
+
+public class NoopHealthCheck implements TopicHealthCheck {
+
+ @Override
+ public boolean healthCheck(List<String> topics) {
+ return true;
+ }
+}
diff --git a/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactoryTest.java b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactoryTest.java
new file mode 100644
index 00000000..b71731f2
--- /dev/null
+++ b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactoryTest.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+
+class TopicHealthCheckFactoryTest {
+
+ @Test
+ void testGetTopicHealthCheck() {
+ var topicHealthCheckFactory = new TopicHealthCheckFactory();
+ var param = new TopicParameters();
+ param.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name());
+ var topicHealthCheck = topicHealthCheckFactory.getTopicHealthCheck(param);
+ assertNotNull(topicHealthCheck);
+ param.setTopicCommInfrastructure(Topic.CommInfrastructure.KAFKA.name());
+ topicHealthCheck = topicHealthCheckFactory.getTopicHealthCheck(param);
+ assertNotNull(topicHealthCheck);
+ param.setTopicCommInfrastructure(Topic.CommInfrastructure.REST.name());
+ topicHealthCheck = topicHealthCheckFactory.getTopicHealthCheck(param);
+ assertNull(topicHealthCheck);
+ }
+}
diff --git a/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheckTest.java b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheckTest.java
new file mode 100644
index 00000000..3b65f73c
--- /dev/null
+++ b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheckTest.java
@@ -0,0 +1,122 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck;
+import org.onap.policy.common.parameters.topic.TopicParameters;
+
+class KafkaHealthCheckTest {
+
+ @Test
+ void testAdminClient() {
+ var param = new TopicParameters();
+ param.setServers(List.of("localhost"));
+ var healthCheck = new KafkaHealthCheck(param);
+ var result = healthCheck.healthCheck(List.of());
+ assertFalse(result);
+
+ param.setAdditionalProps(Map.of("key", "value"));
+ result = healthCheck.healthCheck(List.of());
+ assertFalse(result);
+ }
+
+ @Test
+ void testMockAdminClientWithError() throws ExecutionException, InterruptedException {
+ var param = new TopicParameters();
+ param.setServers(List.of("localhost"));
+ var adminClient = mock(AdminClient.class);
+ KafkaFuture<Collection<Node>> kafkaFuture = mock(KafkaFuture.class);
+ var describeCluster = mock(DescribeClusterResult.class);
+ when(describeCluster.nodes()).thenReturn(kafkaFuture);
+ when(adminClient.describeCluster()).thenReturn(describeCluster);
+ when(kafkaFuture.get()).thenThrow(new InterruptedException());
+ var healthCheck = createKafkaHealthCheck(adminClient, param);
+ var result = healthCheck.healthCheck(List.of());
+ Assertions.assertFalse(result);
+ }
+
+ @Test
+ void testMockAdminClient() {
+ var param = new TopicParameters();
+ var adminClient = mock(AdminClient.class);
+ // no server address
+ var healthCheck = createKafkaHealthCheck(adminClient, param);
+ var result = healthCheck.healthCheck(List.of());
+ Assertions.assertTrue(result);
+
+ param.setServers(List.of());
+ result = healthCheck.healthCheck(List.of());
+ Assertions.assertTrue(result);
+
+ // no node Kafka
+ param.setServers(List.of("localhost"));
+ healthCheck = createKafkaHealthCheck(adminClient, param);
+ var describeCluster = mock(DescribeClusterResult.class);
+ when(describeCluster.nodes()).thenReturn(KafkaFuture.completedFuture(null));
+ when(adminClient.describeCluster()).thenReturn(describeCluster);
+ result = healthCheck.healthCheck(List.of());
+ Assertions.assertFalse(result);
+
+ // Kafka is UP
+ var node = new Node(1, "localhost", 9092);
+ when(describeCluster.nodes()).thenReturn(KafkaFuture.completedFuture(List.of(node)));
+ result = healthCheck.healthCheck(List.of());
+ Assertions.assertTrue(result);
+
+ // Kafka topics not available
+ var listTopics = mock(ListTopicsResult.class);
+ when(adminClient.listTopics()).thenReturn(listTopics);
+ when(listTopics.names()).thenReturn(KafkaFuture.completedFuture(Set.of()));
+ result = healthCheck.healthCheck(List.of("topic"));
+ Assertions.assertFalse(result);
+
+ when(listTopics.names()).thenReturn(KafkaFuture.completedFuture(Set.of("topic")));
+ result = healthCheck.healthCheck(List.of("wrongTopic"));
+ Assertions.assertFalse(result);
+
+ // Kafka topics available
+ result = healthCheck.healthCheck(List.of("topic"));
+ Assertions.assertTrue(result);
+ }
+
+ private TopicHealthCheck createKafkaHealthCheck(AdminClient adminClient, TopicParameters param) {
+ return new KafkaHealthCheck(param) {
+ @Override
+ protected AdminClient createAdminClient() {
+ return adminClient;
+ }
+ };
+ }
+}
diff --git a/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheckTest.java b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheckTest.java
new file mode 100644
index 00000000..2eb36040
--- /dev/null
+++ b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheckTest.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.message.bus.healthcheck.noop;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import org.junit.jupiter.api.Test;
+
+class NoopHealthCheckTest {
+
+ @Test
+ void testBuild() {
+ var healthCheck = new NoopHealthCheck();
+ var result = healthCheck.healthCheck(List.of());
+ assertTrue(result);
+ }
+}