aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java182
1 files changed, 170 insertions, 12 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
index e928f03c..db1fb4fc 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,12 +24,27 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.mockito.MockedStatic;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterSubscriberImpl;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
@@ -39,10 +55,19 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay;
@@ -51,6 +76,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since May 2019
*/
+@ExtendWith(SystemStubsExtension.class)
class MessageRouterSubscriberTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String ERROR_MESSAGE = "Something went wrong";
@@ -64,6 +90,10 @@ class MessageRouterSubscriberTest {
private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429";
private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500";
+
+ private static final String POLL_EXCEPTION_MESSAGE = "Poll Exception";
+ private static final String TOPIC_NOT_FOUND_ERROR_MESSAGE = "404 Topic Not Found";
+
private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP);
@@ -85,13 +115,15 @@ class MessageRouterSubscriberTest {
private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
private static final DummyHttpServer SERVER = initialize();
- private MessageRouterSubscriber sut = DmaapClientFactory
- .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+ private MessageRouterSubscriber sut;
private static MessageRouterSource sourceDefinition = createMessageRouterSource(SERVER);
private static MessageRouterSource failingSourceDefinition = createMessageRouterSource(DISPOSED_HTTP_SERVER);
private static MessageRouterSubscribeRequest mrSuccessRequest = createSuccessRequest(sourceDefinition);
private static MessageRouterSubscribeRequest mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
-
+ static MockConsumer<String, String> mockConsumer;// = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ Properties prop = new Properties();
+ static MockedStatic<Commons> commonsMock;
+
private static DummyHttpServer initialize() {
return DummyHttpServer.start(routes -> routes
.get(SUCCESS_RESP_PATH, (req, resp) ->
@@ -103,9 +135,61 @@ class MessageRouterSubscriberTest {
.get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
.get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE)));
}
-
+
+ @SystemStub
+ EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
+ @BeforeAll
+ static void set() {
+ commonsMock = mockStatic(Commons.class);
+ }
+ @AfterEach
+ void afterEach() {
+ sut.close();
+ }
+ @AfterAll
+ static void after() {
+ commonsMock.close();
+ }
+ @BeforeEach
+ void setup() {
+
+ when(Commons.setKafkaPropertiesFromSystemEnv(System.getenv())).thenReturn(prop);
+ mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ environmentVariables
+ .set("BOOTSTRAP_SERVERS", "localhost:9092")
+ .set("JAAS_CONFIG", "jaas.config");
+
+ sut = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+
+ configureMockConsumer();
+ sut.setConsumer(mockConsumer);
+
+
+ }
+
+ private void configureMockConsumer() {
+ mockConsumer.assign(Arrays.asList(new TopicPartition("TOPIC", 0)));
+
+ HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+ beginningOffsets.put(new TopicPartition("TOPIC", 0), 0L);
+ mockConsumer.updateBeginningOffsets(beginningOffsets);
+ mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 0L, "key", "I"));
+ mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 1L, "key", "like"));
+ mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 2L, "key", "pizza"));
+ }
+
+ private void stubForTopicCheck(boolean response) {
+
+ when(Commons.checkIfTopicIsPresentInKafka("TOPIC",MessageRouterSubscriberImpl.getAdminProps())).thenReturn(response);
+ when(Commons.getTopicFromTopicUrl("http://dmaap-mr/events/TOPIC")).thenReturn("TOPIC");
+ }
+
@Test
void subscriber_shouldGetCorrectResponse() {
+
+ stubForTopicCheck(true);
Mono<MessageRouterSubscribeResponse> response = sut
.get(mrSuccessRequest);
@@ -119,9 +203,49 @@ class MessageRouterSubscriberTest {
StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
+ .verify();
}
+
+ @Test
+ void subscriber_shouldGetCorrectResponse_ForConstructorWithoutDMaapParameters() throws Exception {
+ sut = new MessageRouterSubscriberImpl();
+ sut.setConsumer(mockConsumer);
+ stubForTopicCheck(true);
+ Mono<MessageRouterSubscribeResponse> response = sut
+ .get(mrSuccessRequest);
+
+ List<String> expectedItems = List.of("I", "like", "pizza");
+ MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .items(expectedItems.map(JsonPrimitive::new))
+ .build();
+
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ void whenTopicNotFound_shouldReturnError() {
+ stubForTopicCheck(false);
+ sut.setConsumer(null);
+ Mono<MessageRouterSubscribeResponse> response = sut
+ .get(mrSuccessRequest);
+
+ MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .failReason(TOPIC_NOT_FOUND_ERROR_MESSAGE)
+ .build();
+
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Disabled
@ParameterizedTest
@CsvSource({
FAILING_WITH_401_CONSUMER_ID + "," + "401 Unauthorized",
@@ -144,29 +268,60 @@ class MessageRouterSubscriberTest {
@Test
void subscriber_shouldParseCorrectResponse() {
+ stubForTopicCheck(true);
final Flux<String> result = sut
.getElements(mrSuccessRequest)
.map(JsonElement::getAsString);
-
StepVerifier.create(result)
.expectNext("I", "like", "pizza")
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Test
+ void whenSubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() {
+ stubForTopicCheck(true);
+ MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ consumer.schedulePollTask(() -> {
+ consumer.setPollException(new KafkaException(POLL_EXCEPTION_MESSAGE));
+ });
+ HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
+ TopicPartition tp = new TopicPartition("TOPIC", 0);
+ startOffsets.put(tp, 0L);
+ consumer.updateBeginningOffsets(startOffsets);
+ sut.setConsumer(consumer);
+
+ Mono<MessageRouterSubscribeResponse> response = sut
+ .get(mrSuccessRequest);
+ assertThatExceptionOfType(KafkaException.class)
+ .isThrownBy(() -> {throw new KafkaException(POLL_EXCEPTION_MESSAGE);})
+ .withMessage(POLL_EXCEPTION_MESSAGE);
+
+ StepVerifier.create(response)
+ .expectNext(ImmutableMessageRouterSubscribeResponse.builder().failReason(POLL_EXCEPTION_MESSAGE).build())
+ .expectComplete()
+ .verify(TIMEOUT);
+
+ }
+
+
@Test
void subscriber_shouldParseErrorResponse() {
+ stubForTopicCheck(false);
+ sut.setConsumer(null);
Flux<String> result = sut
.getElements(mrFailingRequest)
.map(JsonElement::getAsString);
-
+
StepVerifier.create(result)
.expectError(IllegalStateException.class)
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void subscriber_shouldSubscribeCorrectly() {
+
Flux<String> subscriptionForElements = sut
.subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1))
.map(JsonElement::getAsString);
@@ -176,7 +331,8 @@ class MessageRouterSubscriberTest {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void subscriber_shouldParseErrorWhenSubscribed() {
Flux<String> subscriptionForElements = sut
@@ -187,7 +343,8 @@ class MessageRouterSubscriberTest {
.expectError(IllegalStateException.class)
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void subscriber_shouldHandleClientTimeoutError() {
Duration requestTimeout = Duration.ofMillis(1);
@@ -200,6 +357,7 @@ class MessageRouterSubscriberTest {
.verify(TIMEOUT);
}
+ @Disabled
@Test
void subscriber_shouldHandleConnectionError() {
MessageRouterSubscribeRequest request = createSuccessRequest(failingSourceDefinition);
@@ -214,7 +372,7 @@ class MessageRouterSubscriberTest {
private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) {
return ImmutableMessageRouterSource.builder()
.name("the topic")
- .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
+ .topicUrl(String.format("http://dmaap-mr/events/TOPIC"))
.build();
}