diff options
author | Fiete Ostkamp <Fiete.Ostkamp@telekom.de> | 2024-07-22 14:21:26 +0200 |
---|---|---|
committer | Fiete Ostkamp <Fiete.Ostkamp@telekom.de> | 2024-07-23 09:44:23 +0200 |
commit | 2513e6ea205ca1a6d1c7ba13b8b448ab649966c2 (patch) | |
tree | f0a8507c0d2b7f570952ec1c7a22db04ca9e9858 /aai-core/src/test/java | |
parent | 405369a8be85f53208cc97a44d8fb3942313e2e7 (diff) |
Make JMS-based messaging compatible with tracing
- use dependency injection instead of new Foo() for jms related classes
- inject interfaces and not their implementations
- add integration test that asserts message sending via JMS to Kafka [1]
[1] this also prepares removal of ActiveMQ as a middleman
Issue-ID: AAI-3932
Change-Id: Icbdd264f5b52adc72aa05046ed66d9bd5108c372
Signed-off-by: Fiete Ostkamp <Fiete.Ostkamp@telekom.de>
Diffstat (limited to 'aai-core/src/test/java')
4 files changed, 177 insertions, 90 deletions
diff --git a/aai-core/src/test/java/org/onap/aai/AAISetup.java b/aai-core/src/test/java/org/onap/aai/AAISetup.java index 16f21ff4..08a0e91b 100644 --- a/aai-core/src/test/java/org/onap/aai/AAISetup.java +++ b/aai-core/src/test/java/org/onap/aai/AAISetup.java @@ -40,6 +40,7 @@ import org.onap.aai.setup.AAIConfigTranslator; import org.onap.aai.setup.SchemaVersion; import org.onap.aai.setup.SchemaVersions; import org.onap.aai.util.AAIConstants; +import org.onap.aai.web.KafkaConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.test.context.ContextConfiguration; @@ -52,7 +53,8 @@ import org.springframework.test.context.web.WebAppConfiguration; @ContextConfiguration( classes = {ConfigConfiguration.class, AAIConfigTranslator.class, EdgeIngestor.class, EdgeSerializer.class, NodeIngestor.class, SpringContextAware.class, IntrospectionConfig.class, RestBeanConfig.class, - XmlFormatTransformerConfiguration.class, ValidationService.class, ValidationConfiguration.class, LoaderFactory.class, NotificationService.class}) + XmlFormatTransformerConfiguration.class, ValidationService.class, ValidationConfiguration.class, + KafkaConfig.class, LoaderFactory.class, NotificationService.class}) @TestPropertySource( properties = {"schema.uri.base.path = /aai", "schema.xsd.maxoccurs = 5000", "schema.translator.list=config", "schema.nodes.location=src/test/resources/onap/oxm", diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java new file mode 100644 index 00000000..c10260da --- /dev/null +++ b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java @@ -0,0 +1,97 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2024 Deutsche Telekom. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; + +import javax.ws.rs.core.Response; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.json.JSONObject; +import org.junit.Test; +import org.mockito.Mock; +import org.onap.aai.AAISetup; +import org.onap.aai.PayloadUtil; +import org.onap.aai.restcore.HttpMethod; +import org.skyscreamer.jsonassert.JSONAssert; +import org.skyscreamer.jsonassert.JSONCompareMode; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Import; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.TestPropertySource; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@ActiveProfiles("kafka") +@Import(KafkaTestConfiguration.class) +@EmbeddedKafka(partitions = 1, topics = { "AAI-EVENT" }) +@TestPropertySource( + properties = { + "jms.bind.address=tcp://localhost:61647", + "aai.events.enabled=true", + "spring.kafka.producer.retries=0", + "spring.kafka.producer.properties.sasl.jaas.config=#{null}", + "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}" + }) +public class AAIKafkaEventIntegrationTest extends AAISetup { + + @Mock + private KafkaTemplate<String, String> kafkaTemplate; + + @Autowired + MessageProducer messageProducer; + + @Autowired + private ConsumerFactory<String, String> consumerFactory; + + @Test + public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived() + throws Exception { + Consumer<String, String> consumer = consumerFactory.createConsumer(); + + consumer.subscribe(Collections.singletonList("AAI-EVENT")); + + String payload = PayloadUtil.getResourcePayload("aai-event.json"); + String expectedResponse = PayloadUtil.getExpectedPayload("aai-event.json"); + messageProducer.sendMessageToDefaultDestination(payload); + + ConsumerRecords<String, String> consumerRecords = KafkaTestUtils.getRecords(consumer, 10000); + assertFalse(consumerRecords.isEmpty()); + consumerRecords.forEach(consumerRecord -> { + JSONAssert.assertEquals(expectedResponse, consumerRecord.value(), JSONCompareMode.NON_EXTENSIBLE); + }); + } + +} diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java deleted file mode 100644 index c72499c4..00000000 --- a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java +++ /dev/null @@ -1,89 +0,0 @@ -package org.onap.aai.kafka; - -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import javax.jms.TextMessage; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import org.onap.aai.PayloadUtil; -import org.springframework.core.env.Environment; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.util.ReflectionTestUtils; - -@RunWith(MockitoJUnitRunner.class) -@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) -public class AAIKafkaEventJMSConsumerTest { - - @Mock - private Environment environment; - - @Mock - private KafkaTemplate<String,String> kafkaTemplate; - - private AAIKafkaEventJMSConsumer aaiKafkaEventJMSConsumer; - - @Before - public void setUp(){ - aaiKafkaEventJMSConsumer = new AAIKafkaEventJMSConsumer(environment,kafkaTemplate); - } - - @Test - public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived() - throws Exception - { - TextMessage mockTextMessage = mock(TextMessage.class); - String payload = PayloadUtil.getResourcePayload("aai-event.json"); - - when(mockTextMessage.getText()).thenReturn(payload); - aaiKafkaEventJMSConsumer.onMessage(mockTextMessage); - verify(kafkaTemplate, times(1)).send(eq("AAI-EVENT"), anyString()); - } - - @Test - public void onMessage_shouldNotSendMessageToKafkaTopic_whenInvalidEventReceived() throws Exception{ - TextMessage mockTextMessage = mock(TextMessage.class); - String payload = PayloadUtil.getResourcePayload("aai-invalid-event.json"); - when(mockTextMessage.getText()).thenReturn(payload); - aaiKafkaEventJMSConsumer.onMessage(mockTextMessage); - } - - - @Test - public void onMessage_shouldHandleJSONException() throws Exception { - // Arrange - AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate); - TextMessage mockTextMessage = mock(TextMessage.class); - ReflectionTestUtils.setField(consumer, "kafkaTemplate", null); // Simulate null kafkaTemplate - - // Act - consumer.onMessage(mockTextMessage); - - // Assert - // Verify that exception is logged - } - - @Test - public void onMessage_shouldHandleGenericException() throws Exception { - // Arrange - AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate); - TextMessage mockTextMessage = mock(TextMessage.class); - when(mockTextMessage.getText()).thenReturn("{\"event-topic\":\"AAI-EVENT\",\"aaiEventPayload\":{}}"); // Valid JSON but missing required fields - - // Act - consumer.onMessage(mockTextMessage); - - // Assert - // Verify that exception is logged - } - -} diff --git a/aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java b/aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java new file mode 100644 index 00000000..730699e6 --- /dev/null +++ b/aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java @@ -0,0 +1,77 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2024 Deutsche Telekom. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.test.context.TestPropertySource; + +@TestConfiguration +public class KafkaTestConfiguration { + + @Value("${spring.embedded.kafka.brokers}") private String bootstrapAddress; + + private String groupId = "test-consumer"; + + @Bean + public KafkaAdmin kafkaAdmin() { + Map<String, Object> configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + return new KafkaAdmin(configs); + } + + @Bean + public ConsumerFactory<String, String> consumerFactory() { + Map<String, Object> props = new HashMap<>(); + props.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + bootstrapAddress); + props.put( + ConsumerConfig.GROUP_ID_CONFIG, + groupId); + props.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) { + + ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + return factory; + } +} |