diff options
7 files changed, 405 insertions, 0 deletions
diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheck.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheck.java new file mode 100644 index 00000000..f371eda9 --- /dev/null +++ b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheck.java @@ -0,0 +1,25 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2025 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.message.bus.healthcheck; + +import java.util.List; + +public interface TopicHealthCheck { + boolean healthCheck(List<String> topics); +} diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactory.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactory.java new file mode 100644 index 00000000..d7790504 --- /dev/null +++ b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactory.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2025 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.message.bus.healthcheck; + +import org.onap.policy.common.message.bus.event.Topic; +import org.onap.policy.common.message.bus.healthcheck.kafka.KafkaHealthCheck; +import org.onap.policy.common.message.bus.healthcheck.noop.NoopHealthCheck; +import org.onap.policy.common.parameters.topic.TopicParameters; + +public class TopicHealthCheckFactory { + + /** + * Get Topic HealthCheck. + * + * @param param TopicParameters + * @return TopicHealthCheck + */ + public TopicHealthCheck getTopicHealthCheck(TopicParameters param) { + return switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { + case KAFKA -> new KafkaHealthCheck(param); + case NOOP -> new NoopHealthCheck(); + default -> null; + }; + } +} diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheck.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheck.java new file mode 100644 index 00000000..ef8a0f76 --- /dev/null +++ b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheck.java @@ -0,0 +1,109 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2025 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.message.bus.healthcheck.kafka; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.KafkaException; +import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck; +import org.onap.policy.common.parameters.topic.TopicParameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaHealthCheck implements TopicHealthCheck { + + private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class); + private final TopicParameters parameters; + + public KafkaHealthCheck(TopicParameters parameters) { + this.parameters = parameters; + } + + /** + * Check that Kafka is OnLine and topics are available. + * + * @return true if Kafka is OnLine + */ + public boolean healthCheck(List<String> topics) { + if (parameters.getServers() == null || parameters.getServers().isEmpty()) { + logger.warn("Kafka Address not defined!"); + return true; + } + try (var client = createAdminClient()) { + if (!checkConnection(client)) { + logger.warn("Kafka not UP yet!"); + return false; + } + if (topics.isEmpty()) { + logger.warn("Kafka is UP"); + return true; + } + + return checkTopics(client, topics); + } catch (KafkaException | ExecutionException e) { + logger.error(e.getMessage()); + return false; + } catch (InterruptedException e) { + logger.error(e.getMessage()); + Thread.currentThread().interrupt(); + return false; + } + } + + private boolean checkConnection(AdminClient client) throws ExecutionException, InterruptedException { + var nodes = client.describeCluster().nodes().get(); + if (nodes == null || nodes.isEmpty()) { + return false; + } + nodes.forEach(node -> logger.debug("nodeId {}", node.id())); + return true; + } + + private boolean checkTopics(AdminClient client, List<String> topics) + throws ExecutionException, InterruptedException { + var listTopics = client.listTopics().names().get(); + if (listTopics == null || listTopics.isEmpty()) { + logger.warn("Kafka topics not available!"); + return false; + } + var setTopics = listTopics.stream().map(String::toLowerCase).collect(Collectors.toSet()); + for (var topic : topics) { + if (!setTopics.contains(topic.toLowerCase())) { + logger.warn("Kafka topic {} not available!", topic); + return false; + } + } + logger.info("Kafka is UP and topics available!"); + return true; + } + + protected AdminClient createAdminClient() { + var kafkaProps = new Properties(); + kafkaProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, parameters.getServers().get(0)); + + if (parameters.isAdditionalPropsValid()) { + kafkaProps.putAll(parameters.getAdditionalProps()); + } + return AdminClient.create(kafkaProps); + } +} diff --git a/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheck.java b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheck.java new file mode 100644 index 00000000..684f7f41 --- /dev/null +++ b/message-bus/src/main/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheck.java @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2025 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.message.bus.healthcheck.noop; + +import java.util.List; +import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck; + +public class NoopHealthCheck implements TopicHealthCheck { + + @Override + public boolean healthCheck(List<String> topics) { + return true; + } +} diff --git a/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactoryTest.java b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactoryTest.java new file mode 100644 index 00000000..b71731f2 --- /dev/null +++ b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/TopicHealthCheckFactoryTest.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2025 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.message.bus.healthcheck; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.junit.jupiter.api.Test; +import org.onap.policy.common.message.bus.event.Topic; +import org.onap.policy.common.parameters.topic.TopicParameters; + +class TopicHealthCheckFactoryTest { + + @Test + void testGetTopicHealthCheck() { + var topicHealthCheckFactory = new TopicHealthCheckFactory(); + var param = new TopicParameters(); + param.setTopicCommInfrastructure(Topic.CommInfrastructure.NOOP.name()); + var topicHealthCheck = topicHealthCheckFactory.getTopicHealthCheck(param); + assertNotNull(topicHealthCheck); + param.setTopicCommInfrastructure(Topic.CommInfrastructure.KAFKA.name()); + topicHealthCheck = topicHealthCheckFactory.getTopicHealthCheck(param); + assertNotNull(topicHealthCheck); + param.setTopicCommInfrastructure(Topic.CommInfrastructure.REST.name()); + topicHealthCheck = topicHealthCheckFactory.getTopicHealthCheck(param); + assertNull(topicHealthCheck); + } +} diff --git a/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheckTest.java b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheckTest.java new file mode 100644 index 00000000..3b65f73c --- /dev/null +++ b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/kafka/KafkaHealthCheckTest.java @@ -0,0 +1,122 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2025 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.message.bus.healthcheck.kafka; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.onap.policy.common.message.bus.healthcheck.TopicHealthCheck; +import org.onap.policy.common.parameters.topic.TopicParameters; + +class KafkaHealthCheckTest { + + @Test + void testAdminClient() { + var param = new TopicParameters(); + param.setServers(List.of("localhost")); + var healthCheck = new KafkaHealthCheck(param); + var result = healthCheck.healthCheck(List.of()); + assertFalse(result); + + param.setAdditionalProps(Map.of("key", "value")); + result = healthCheck.healthCheck(List.of()); + assertFalse(result); + } + + @Test + void testMockAdminClientWithError() throws ExecutionException, InterruptedException { + var param = new TopicParameters(); + param.setServers(List.of("localhost")); + var adminClient = mock(AdminClient.class); + KafkaFuture<Collection<Node>> kafkaFuture = mock(KafkaFuture.class); + var describeCluster = mock(DescribeClusterResult.class); + when(describeCluster.nodes()).thenReturn(kafkaFuture); + when(adminClient.describeCluster()).thenReturn(describeCluster); + when(kafkaFuture.get()).thenThrow(new InterruptedException()); + var healthCheck = createKafkaHealthCheck(adminClient, param); + var result = healthCheck.healthCheck(List.of()); + Assertions.assertFalse(result); + } + + @Test + void testMockAdminClient() { + var param = new TopicParameters(); + var adminClient = mock(AdminClient.class); + // no server address + var healthCheck = createKafkaHealthCheck(adminClient, param); + var result = healthCheck.healthCheck(List.of()); + Assertions.assertTrue(result); + + param.setServers(List.of()); + result = healthCheck.healthCheck(List.of()); + Assertions.assertTrue(result); + + // no node Kafka + param.setServers(List.of("localhost")); + healthCheck = createKafkaHealthCheck(adminClient, param); + var describeCluster = mock(DescribeClusterResult.class); + when(describeCluster.nodes()).thenReturn(KafkaFuture.completedFuture(null)); + when(adminClient.describeCluster()).thenReturn(describeCluster); + result = healthCheck.healthCheck(List.of()); + Assertions.assertFalse(result); + + // Kafka is UP + var node = new Node(1, "localhost", 9092); + when(describeCluster.nodes()).thenReturn(KafkaFuture.completedFuture(List.of(node))); + result = healthCheck.healthCheck(List.of()); + Assertions.assertTrue(result); + + // Kafka topics not available + var listTopics = mock(ListTopicsResult.class); + when(adminClient.listTopics()).thenReturn(listTopics); + when(listTopics.names()).thenReturn(KafkaFuture.completedFuture(Set.of())); + result = healthCheck.healthCheck(List.of("topic")); + Assertions.assertFalse(result); + + when(listTopics.names()).thenReturn(KafkaFuture.completedFuture(Set.of("topic"))); + result = healthCheck.healthCheck(List.of("wrongTopic")); + Assertions.assertFalse(result); + + // Kafka topics available + result = healthCheck.healthCheck(List.of("topic")); + Assertions.assertTrue(result); + } + + private TopicHealthCheck createKafkaHealthCheck(AdminClient adminClient, TopicParameters param) { + return new KafkaHealthCheck(param) { + @Override + protected AdminClient createAdminClient() { + return adminClient; + } + }; + } +} diff --git a/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheckTest.java b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheckTest.java new file mode 100644 index 00000000..2eb36040 --- /dev/null +++ b/message-bus/src/test/java/org/onap/policy/common/message/bus/healthcheck/noop/NoopHealthCheckTest.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2025 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.message.bus.healthcheck.noop; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import org.junit.jupiter.api.Test; + +class NoopHealthCheckTest { + + @Test + void testBuild() { + var healthCheck = new NoopHealthCheck(); + var result = healthCheck.healthCheck(List.of()); + assertTrue(result); + } +} |