diff options
author | sushant53 <sushant.jadhav@t-systems.com> | 2023-08-11 19:45:44 +0530 |
---|---|---|
committer | Sushant Jadhav <sushant.jadhav@t-systems.com> | 2023-09-11 12:09:33 +0000 |
commit | 86513b7ca5b8cc8ba93bf23176aeac57656b7c66 (patch) | |
tree | 0b1a0499dbccbb937c8eca7b2cef075ad63134df /rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java | |
parent | 9d8a9326758a162eb26236a1dd9de1c29c504554 (diff) |
[DCAEGEN2] Use kafka API directly in DMaaP library
Use kafka API directly in dmaap-client library instead of the DMaaP Rest APIs.
Issue-ID: DCAEGEN2-3364
Change-Id: I7f27d9d5f443fe3934896fa01f907b6001898495
Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java')
-rw-r--r-- | rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java | 159 |
1 files changed, 114 insertions, 45 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index a1ad951f..a806ba19 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,13 +24,22 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; import org.mockserver.client.MockServerClient; import org.mockserver.matchers.Times; import org.mockserver.verify.VerificationTimes; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; @@ -45,10 +55,16 @@ import org.testcontainers.junit.jupiter.Testcontainers; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mockStatic; import static org.mockserver.model.HttpRequest.request; import static org.mockserver.model.HttpResponse.response; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest; @@ -65,6 +81,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance; +@ExtendWith(SystemStubsExtension.class) @Testcontainers class MessageRouterPublisherIT { @Container @@ -74,6 +91,7 @@ class MessageRouterPublisherIT { private static String EVENTS_PATH; private static String PROXY_MOCK_EVENTS_PATH; + private static final long REPEAT_SUBSCRIPTION = 20; private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n" + "{" @@ -108,22 +126,46 @@ class MessageRouterPublisherIT { + "}" + "}"; - private final MessageRouterPublisher publisher = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - private final MessageRouterSubscriber subscriber = DmaapClientFactory - .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - + private MessageRouterPublisher publisher; + private MessageRouterSubscriber subscriber; + Mono<MessageRouterSubscribeResponse> response; + + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + @BeforeAll static void setUp() { EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + //sleep introduced to wait till all containers are started + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } - + + @AfterEach + void afterEach() { + publisher.close(); + subscriber.close(); + } + @BeforeEach void set() { MOCK_SERVER_CLIENT.reset(); + environmentVariables + .set("BOOTSTRAP_SERVERS", "localhost:9092") + .set("kafka.auto.offset.reset","earliest"); + publisher = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + subscriber = DmaapClientFactory + .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + response=null; + } - + + @Disabled @Test void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given @@ -143,7 +185,8 @@ class MessageRouterPublisherIT { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void publisher_shouldHandleBadRequestError() { //given @@ -175,13 +218,16 @@ class MessageRouterPublisherIT { final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl); final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); - + //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); + //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -200,12 +246,15 @@ class MessageRouterPublisherIT { final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl); final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); - + //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -229,11 +278,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -256,10 +307,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -283,10 +337,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -310,10 +367,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -321,7 +381,8 @@ class MessageRouterPublisherIT { .expectComplete() .verify(); } - + + @Disabled @Test void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() { //given @@ -347,7 +408,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1)); } - + + @Disabled @Test void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() { //given @@ -380,7 +442,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() { //given @@ -412,7 +475,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldRetryManyTimesAndSuccessfullyPublish() { //given @@ -453,7 +517,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); } - + + @Disabled @Test void publisher_shouldHandleLastRetryError500() { //given @@ -489,7 +554,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldSuccessfullyPublishWhenConnectionPoolConfigurationIsSet() { //given @@ -521,7 +587,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(1)); } - + + @Disabled @Test void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublishWithConnectionPoolConfiguration() { //given @@ -553,7 +620,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldSuccessfullyPublishSingleMessageWithBasicAuthHeader() { //given @@ -581,7 +649,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path) .withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1)); } - + + @Disabled @Test void publisher_shouldHandleError429WhenConnectionPollLimitsHasBeenReached() { //given |