diff options
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java')
-rw-r--r-- | rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java | 90 |
1 files changed, 65 insertions, 25 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index 3d43e817..48a12455 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,9 +24,13 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; + +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockserver.client.MockServerClient; import org.mockserver.matchers.Times; import org.mockserver.verify.VerificationTimes; @@ -43,6 +48,9 @@ import org.testcontainers.junit.jupiter.Testcontainers; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.time.Duration; import java.util.concurrent.TimeUnit; @@ -62,6 +70,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance; +@ExtendWith(SystemStubsExtension.class) @Testcontainers class MessageRouterSubscriberIT { @Container @@ -71,16 +80,11 @@ class MessageRouterSubscriberIT { private static String EVENTS_PATH; private static String PROXY_MOCK_EVENTS_PATH; + private static final long REPEAT_SUBSCRIPTION = 20; private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String CONSUMER_GROUP = "group1"; private static final String CONSUMER_ID = "consumer200"; - private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" + - "{" + - "\"mrstatus\":3001," + - "\"helpURL\":\"http://onap.readthedocs.io\"," + - "\"message\":\"No such topic exists.-[%s]\"," + - "\"status\":404" + - "}"; + private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Topic Not Found"; private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n" + "{" + "\"requestError\":" @@ -94,22 +98,44 @@ class MessageRouterSubscriberIT { + "}" + "}"; - private MessageRouterPublisher publisher = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - private MessageRouterSubscriber subscriber = DmaapClientFactory - .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - + private MessageRouterPublisher publisher; + private MessageRouterSubscriber subscriber; + Mono<MessageRouterSubscribeResponse> response; + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + @BeforeAll static void setUp() { EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + //sleep introduced to wait till all containers are started + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } @BeforeEach void set() { MOCK_SERVER_CLIENT.reset(); + environmentVariables + .set("BOOTSTRAP_SERVERS", "localhost:9092") + .set("kafka.auto.offset.reset","earliest"); + publisher = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + subscriber = DmaapClientFactory + .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + response=null; + } + + @AfterEach + void afterEach() { + if(publisher != null) + publisher.close(); + if(subscriber != null) + subscriber.close(); } - @Test void subscriber_shouldHandleNoSuchTopicException() { //given @@ -144,11 +170,13 @@ class MessageRouterSubscriberIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -171,11 +199,14 @@ class MessageRouterSubscriberIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); + //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -183,6 +214,7 @@ class MessageRouterSubscriberIT { .verify(); } + @Disabled @Test void subscriber_shouldExtractItemsFromResponse() { //given @@ -208,7 +240,8 @@ class MessageRouterSubscriberIT { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldSubscribeToTopic() { //given @@ -235,7 +268,8 @@ class MessageRouterSubscriberIT { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldHandleTimeoutException() { //given @@ -261,6 +295,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1)); } + @Disabled @Test void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() { //given @@ -298,6 +333,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } + @Disabled @Test void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() { //given @@ -336,6 +372,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } + @Disabled @Test void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() { //given @@ -383,6 +420,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); } + @Disabled @Test void subscriber_shouldHandleLastRetryError500() { //given @@ -416,6 +454,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } + @Disabled @Test void subscriber_shouldSubscribeToTopicWithConnectionPoolConfiguration() { //given @@ -445,6 +484,7 @@ class MessageRouterSubscriberIT { .verify(TIMEOUT); } + @Disabled @Test void subscriber_shouldHandleSingleItemResponseWithBasicAuthHeader() { //given |