diff options
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client')
8 files changed, 486 insertions, 100 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java index 5b1984df..a1f9ac9f 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +39,6 @@ final class DMaapContainer { static DockerComposeContainer createContainerInstance(){ return new DockerComposeContainer( new File(DOCKER_COMPOSE_FILE_PATH)) - .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT) .withLocalCompose(true); } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index a1ad951f..a806ba19 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,13 +24,22 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; import org.mockserver.client.MockServerClient; import org.mockserver.matchers.Times; import org.mockserver.verify.VerificationTimes; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; @@ -45,10 +55,16 @@ import org.testcontainers.junit.jupiter.Testcontainers; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mockStatic; import static org.mockserver.model.HttpRequest.request; import static org.mockserver.model.HttpResponse.response; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest; @@ -65,6 +81,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance; +@ExtendWith(SystemStubsExtension.class) @Testcontainers class MessageRouterPublisherIT { @Container @@ -74,6 +91,7 @@ class MessageRouterPublisherIT { private static String EVENTS_PATH; private static String PROXY_MOCK_EVENTS_PATH; + private static final long REPEAT_SUBSCRIPTION = 20; private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n" + "{" @@ -108,22 +126,46 @@ class MessageRouterPublisherIT { + "}" + "}"; - private final MessageRouterPublisher publisher = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - private final MessageRouterSubscriber subscriber = DmaapClientFactory - .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - + private MessageRouterPublisher publisher; + private MessageRouterSubscriber subscriber; + Mono<MessageRouterSubscribeResponse> response; + + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + @BeforeAll static void setUp() { EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + //sleep introduced to wait till all containers are started + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } - + + @AfterEach + void afterEach() { + publisher.close(); + subscriber.close(); + } + @BeforeEach void set() { MOCK_SERVER_CLIENT.reset(); + environmentVariables + .set("BOOTSTRAP_SERVERS", "localhost:9092") + .set("kafka.auto.offset.reset","earliest"); + publisher = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + subscriber = DmaapClientFactory + .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + response=null; + } - + + @Disabled @Test void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given @@ -143,7 +185,8 @@ class MessageRouterPublisherIT { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void publisher_shouldHandleBadRequestError() { //given @@ -175,13 +218,16 @@ class MessageRouterPublisherIT { final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl); final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); - + //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); + //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -200,12 +246,15 @@ class MessageRouterPublisherIT { final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl); final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); - + //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -229,11 +278,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -256,10 +307,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -283,10 +337,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -310,10 +367,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -321,7 +381,8 @@ class MessageRouterPublisherIT { .expectComplete() .verify(); } - + + @Disabled @Test void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() { //given @@ -347,7 +408,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1)); } - + + @Disabled @Test void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() { //given @@ -380,7 +442,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() { //given @@ -412,7 +475,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldRetryManyTimesAndSuccessfullyPublish() { //given @@ -453,7 +517,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); } - + + @Disabled @Test void publisher_shouldHandleLastRetryError500() { //given @@ -489,7 +554,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldSuccessfullyPublishWhenConnectionPoolConfigurationIsSet() { //given @@ -521,7 +587,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(1)); } - + + @Disabled @Test void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublishWithConnectionPoolConfiguration() { //given @@ -553,7 +620,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldSuccessfullyPublishSingleMessageWithBasicAuthHeader() { //given @@ -581,7 +649,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path) .withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1)); } - + + @Disabled @Test void publisher_shouldHandleError429WhenConnectionPollLimitsHasBeenReached() { //given diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java index 97fd26f5..816021bb 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,13 +24,24 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Ignore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterPublisherImpl; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; @@ -40,10 +52,17 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.time.Duration; +import java.util.concurrent.Future; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay; @@ -52,6 +71,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since May 2019 */ +@ExtendWith(SystemStubsExtension.class) class MessageRouterPublisherTest { private static final String ERROR_MESSAGE = "Something went wrong"; @@ -71,9 +91,9 @@ class MessageRouterPublisherTest { private static final List<String> messageBatchItems = List.of("ala", "ma", "kota"); private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); private static final DummyHttpServer SERVER = initialize(); - private MessageRouterPublisher sut = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - + private MessageRouterPublisher sut; + MockProducer<String, String> mockProducer = + new MockProducer<>(true, new StringSerializer(), new StringSerializer()); private static DummyHttpServer initialize() { return DummyHttpServer.start(routes -> routes .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK"))) @@ -86,7 +106,25 @@ class MessageRouterPublisherTest { .post(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE)) ); } - + + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + @BeforeEach + void setUp() { + environmentVariables + .set("BOOTSTRAP_SERVERS", "localhost:9092") + .set("JAAS_CONFIG", "jaas.config"); + + sut = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + sut.setKafkaProducer(mockProducer); + } + @AfterEach + void afterEach() { + sut.close(); + } + @Test void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given @@ -102,7 +140,45 @@ class MessageRouterPublisherTest { .expectComplete() .verify(TIMEOUT); } + + @Test + void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch_ForConstructorWithoutDMaapParameters() throws Exception { + //given + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER); + final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new); + sut = new MessageRouterPublisherImpl(); + sut.setKafkaProducer(mockProducer); + //when + final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); + //then + StepVerifier.create(result) + .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build()) + .expectComplete() + .verify(TIMEOUT); + } + @Test + void publisher_shouldHandleError() { + + sut.setKafkaProducer(mockProducer); + + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER); + + //when + final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); + RuntimeException e = new RuntimeException(); + mockProducer.errorNext(e); + Future<RecordMetadata> record =MessageRouterPublisherImpl.getFuture(); + try{ + record.get(); + }catch(Exception ex) { + assertEquals(e, ex); + } + assertTrue(record.isDone()); + + } + + @Disabled @ParameterizedTest @CsvSource({ FAILING_WITH_400_RESP_PATH + "," + "400 Bad Request", @@ -126,7 +202,8 @@ class MessageRouterPublisherTest { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void publisher_shouldHandleClientTimeoutError() { //given @@ -142,7 +219,8 @@ class MessageRouterPublisherTest { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void publisher_shouldHandleConnectionError() { //given @@ -179,9 +257,7 @@ class MessageRouterPublisherTest { private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) { return ImmutableMessageRouterSink.builder() .name("the topic") - .topicUrl(String.format("http://%s:%d%s", - dummyHttpServer.host(), - dummyHttpServer.port(), + .topicUrl(String.format("http://dmaap-mr%s", topicPath) ) .build(); diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index 3d43e817..48a12455 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,9 +24,13 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; + +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockserver.client.MockServerClient; import org.mockserver.matchers.Times; import org.mockserver.verify.VerificationTimes; @@ -43,6 +48,9 @@ import org.testcontainers.junit.jupiter.Testcontainers; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.time.Duration; import java.util.concurrent.TimeUnit; @@ -62,6 +70,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance; +@ExtendWith(SystemStubsExtension.class) @Testcontainers class MessageRouterSubscriberIT { @Container @@ -71,16 +80,11 @@ class MessageRouterSubscriberIT { private static String EVENTS_PATH; private static String PROXY_MOCK_EVENTS_PATH; + private static final long REPEAT_SUBSCRIPTION = 20; private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String CONSUMER_GROUP = "group1"; private static final String CONSUMER_ID = "consumer200"; - private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" + - "{" + - "\"mrstatus\":3001," + - "\"helpURL\":\"http://onap.readthedocs.io\"," + - "\"message\":\"No such topic exists.-[%s]\"," + - "\"status\":404" + - "}"; + private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Topic Not Found"; private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n" + "{" + "\"requestError\":" @@ -94,22 +98,44 @@ class MessageRouterSubscriberIT { + "}" + "}"; - private MessageRouterPublisher publisher = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - private MessageRouterSubscriber subscriber = DmaapClientFactory - .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - + private MessageRouterPublisher publisher; + private MessageRouterSubscriber subscriber; + Mono<MessageRouterSubscribeResponse> response; + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + @BeforeAll static void setUp() { EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + //sleep introduced to wait till all containers are started + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } @BeforeEach void set() { MOCK_SERVER_CLIENT.reset(); + environmentVariables + .set("BOOTSTRAP_SERVERS", "localhost:9092") + .set("kafka.auto.offset.reset","earliest"); + publisher = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + subscriber = DmaapClientFactory + .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + response=null; + } + + @AfterEach + void afterEach() { + if(publisher != null) + publisher.close(); + if(subscriber != null) + subscriber.close(); } - @Test void subscriber_shouldHandleNoSuchTopicException() { //given @@ -144,11 +170,13 @@ class MessageRouterSubscriberIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -171,11 +199,14 @@ class MessageRouterSubscriberIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); + //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -183,6 +214,7 @@ class MessageRouterSubscriberIT { .verify(); } + @Disabled @Test void subscriber_shouldExtractItemsFromResponse() { //given @@ -208,7 +240,8 @@ class MessageRouterSubscriberIT { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldSubscribeToTopic() { //given @@ -235,7 +268,8 @@ class MessageRouterSubscriberIT { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldHandleTimeoutException() { //given @@ -261,6 +295,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1)); } + @Disabled @Test void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() { //given @@ -298,6 +333,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } + @Disabled @Test void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() { //given @@ -336,6 +372,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } + @Disabled @Test void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() { //given @@ -383,6 +420,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); } + @Disabled @Test void subscriber_shouldHandleLastRetryError500() { //given @@ -416,6 +454,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } + @Disabled @Test void subscriber_shouldSubscribeToTopicWithConnectionPoolConfiguration() { //given @@ -445,6 +484,7 @@ class MessageRouterSubscriberIT { .verify(TIMEOUT); } + @Disabled @Test void subscriber_shouldHandleSingleItemResponseWithBasicAuthHeader() { //given diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java index e928f03c..db1fb4fc 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,12 +24,27 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.MockedStatic; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterSubscriberImpl; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; @@ -39,10 +55,19 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Properties; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay; @@ -51,6 +76,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since May 2019 */ +@ExtendWith(SystemStubsExtension.class) class MessageRouterSubscriberTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String ERROR_MESSAGE = "Something went wrong"; @@ -64,6 +90,10 @@ class MessageRouterSubscriberTest { private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409"; private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429"; private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500"; + + private static final String POLL_EXCEPTION_MESSAGE = "Poll Exception"; + private static final String TOPIC_NOT_FOUND_ERROR_MESSAGE = "404 Topic Not Found"; + private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP); @@ -85,13 +115,15 @@ class MessageRouterSubscriberTest { private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); private static final DummyHttpServer SERVER = initialize(); - private MessageRouterSubscriber sut = DmaapClientFactory - .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + private MessageRouterSubscriber sut; private static MessageRouterSource sourceDefinition = createMessageRouterSource(SERVER); private static MessageRouterSource failingSourceDefinition = createMessageRouterSource(DISPOSED_HTTP_SERVER); private static MessageRouterSubscribeRequest mrSuccessRequest = createSuccessRequest(sourceDefinition); private static MessageRouterSubscribeRequest mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID); - + static MockConsumer<String, String> mockConsumer;// = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + Properties prop = new Properties(); + static MockedStatic<Commons> commonsMock; + private static DummyHttpServer initialize() { return DummyHttpServer.start(routes -> routes .get(SUCCESS_RESP_PATH, (req, resp) -> @@ -103,9 +135,61 @@ class MessageRouterSubscriberTest { .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE)) .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE))); } - + + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + @BeforeAll + static void set() { + commonsMock = mockStatic(Commons.class); + } + @AfterEach + void afterEach() { + sut.close(); + } + @AfterAll + static void after() { + commonsMock.close(); + } + @BeforeEach + void setup() { + + when(Commons.setKafkaPropertiesFromSystemEnv(System.getenv())).thenReturn(prop); + mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + environmentVariables + .set("BOOTSTRAP_SERVERS", "localhost:9092") + .set("JAAS_CONFIG", "jaas.config"); + + sut = DmaapClientFactory + .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + + configureMockConsumer(); + sut.setConsumer(mockConsumer); + + + } + + private void configureMockConsumer() { + mockConsumer.assign(Arrays.asList(new TopicPartition("TOPIC", 0))); + + HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>(); + beginningOffsets.put(new TopicPartition("TOPIC", 0), 0L); + mockConsumer.updateBeginningOffsets(beginningOffsets); + mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 0L, "key", "I")); + mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 1L, "key", "like")); + mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 2L, "key", "pizza")); + } + + private void stubForTopicCheck(boolean response) { + + when(Commons.checkIfTopicIsPresentInKafka("TOPIC",MessageRouterSubscriberImpl.getAdminProps())).thenReturn(response); + when(Commons.getTopicFromTopicUrl("http://dmaap-mr/events/TOPIC")).thenReturn("TOPIC"); + } + @Test void subscriber_shouldGetCorrectResponse() { + + stubForTopicCheck(true); Mono<MessageRouterSubscribeResponse> response = sut .get(mrSuccessRequest); @@ -119,9 +203,49 @@ class MessageRouterSubscriberTest { StepVerifier.create(response) .expectNext(expectedResponse) .expectComplete() - .verify(TIMEOUT); + .verify(); } + + @Test + void subscriber_shouldGetCorrectResponse_ForConstructorWithoutDMaapParameters() throws Exception { + sut = new MessageRouterSubscriberImpl(); + sut.setConsumer(mockConsumer); + stubForTopicCheck(true); + Mono<MessageRouterSubscribeResponse> response = sut + .get(mrSuccessRequest); + + List<String> expectedItems = List.of("I", "like", "pizza"); + MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse + .builder() + .items(expectedItems.map(JsonPrimitive::new)) + .build(); + + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } + + @Test + void whenTopicNotFound_shouldReturnError() { + stubForTopicCheck(false); + sut.setConsumer(null); + Mono<MessageRouterSubscribeResponse> response = sut + .get(mrSuccessRequest); + + MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse + .builder() + .failReason(TOPIC_NOT_FOUND_ERROR_MESSAGE) + .build(); + + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Disabled @ParameterizedTest @CsvSource({ FAILING_WITH_401_CONSUMER_ID + "," + "401 Unauthorized", @@ -144,29 +268,60 @@ class MessageRouterSubscriberTest { @Test void subscriber_shouldParseCorrectResponse() { + stubForTopicCheck(true); final Flux<String> result = sut .getElements(mrSuccessRequest) .map(JsonElement::getAsString); - StepVerifier.create(result) .expectNext("I", "like", "pizza") .expectComplete() .verify(TIMEOUT); } - + + @Test + void whenSubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() { + stubForTopicCheck(true); + MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.schedulePollTask(() -> { + consumer.setPollException(new KafkaException(POLL_EXCEPTION_MESSAGE)); + }); + HashMap<TopicPartition, Long> startOffsets = new HashMap<>(); + TopicPartition tp = new TopicPartition("TOPIC", 0); + startOffsets.put(tp, 0L); + consumer.updateBeginningOffsets(startOffsets); + sut.setConsumer(consumer); + + Mono<MessageRouterSubscribeResponse> response = sut + .get(mrSuccessRequest); + assertThatExceptionOfType(KafkaException.class) + .isThrownBy(() -> {throw new KafkaException(POLL_EXCEPTION_MESSAGE);}) + .withMessage(POLL_EXCEPTION_MESSAGE); + + StepVerifier.create(response) + .expectNext(ImmutableMessageRouterSubscribeResponse.builder().failReason(POLL_EXCEPTION_MESSAGE).build()) + .expectComplete() + .verify(TIMEOUT); + + } + + @Test void subscriber_shouldParseErrorResponse() { + stubForTopicCheck(false); + sut.setConsumer(null); Flux<String> result = sut .getElements(mrFailingRequest) .map(JsonElement::getAsString); - + StepVerifier.create(result) .expectError(IllegalStateException.class) .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldSubscribeCorrectly() { + Flux<String> subscriptionForElements = sut .subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1)) .map(JsonElement::getAsString); @@ -176,7 +331,8 @@ class MessageRouterSubscriberTest { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldParseErrorWhenSubscribed() { Flux<String> subscriptionForElements = sut @@ -187,7 +343,8 @@ class MessageRouterSubscriberTest { .expectError(IllegalStateException.class) .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldHandleClientTimeoutError() { Duration requestTimeout = Duration.ofMillis(1); @@ -200,6 +357,7 @@ class MessageRouterSubscriberTest { .verify(TIMEOUT); } + @Disabled @Test void subscriber_shouldHandleConnectionError() { MessageRouterSubscribeRequest request = createSuccessRequest(failingSourceDefinition); @@ -214,7 +372,7 @@ class MessageRouterSubscriberTest { private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) { return ImmutableMessageRouterSource.builder() .name("the topic") - .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port())) + .topicUrl(String.format("http://dmaap-mr/events/TOPIC")) .build(); } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java index 72c35925..3d35c2ac 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +26,13 @@ import io.vavr.Tuple2; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Properties; class CommonsTest { @@ -53,7 +59,26 @@ class CommonsTest { // then verifyBasicAuthHeader(basicAuthHeader, "Og=="); } - + + @Test + void shouldFetchTopicFromTopicURL() { + String topicUrl = "http://message-router:3904/events/unauthenticated.VES_PNFREG_OUTPUT"; + String expected = "unauthenticated.VES_PNFREG_OUTPUT"; + assertThat(getTopicFromTopicUrl(topicUrl)) + .withFailMessage("Extracted topic name from topicUrl '%s' is not as expected topic '%s'",topicUrl, expected) + .isEqualTo(expected); + } + + @Test + void shouldFetchTopicFromTopicUrlEndingWithSlash() { + String topicUrl = "http://message-router:3904/events/unauthenticated.VES_PNFREG_OUTPUT/"; + String expected = "unauthenticated.VES_PNFREG_OUTPUT"; + assertThat(getTopicFromTopicUrl(topicUrl)) + .withFailMessage("Extracted topic name from topicUrl '%s' is not as expected topic '%s'",topicUrl, expected) + .isEqualTo(expected); + } + + private AafCredentials create(String username, String password) { return ImmutableAafCredentials.builder() .username(username) diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index 6c6ded16..2e169dc4 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +31,8 @@ import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMultimap; import io.vavr.collection.List; + +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; @@ -68,6 +71,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since April 2019 */ +@Disabled class MessageRouterPublisherImplTest { private static final Duration TIMEOUT = Duration.ofSeconds(5); private static final String TOPIC_URL = "https://dmaap-mr/TOPIC"; @@ -75,14 +79,17 @@ class MessageRouterPublisherImplTest { private static final String ERROR_MESSAGE = "Something went wrong"; private final RxHttpClient httpClient = mock(RxHttpClient.class); private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class); - private final MessageRouterPublisher cut = new MessageRouterPublisherImpl( - httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter); + private final MessageRouterPublisher cut; private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN); private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL); private final HttpResponse successHttpResponse = createHttpResponse("OK", 200); private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500); - + + private MessageRouterPublisherImplTest() throws Exception{ + cut = new MessageRouterPublisherImpl( + httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter); + } @Test void puttingElementsShouldYieldNonChunkedHttpRequest() { // given diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java index 006965c2..373424ba 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,11 +32,17 @@ import static org.mockito.Mockito.verify; import com.google.gson.JsonSyntaxException; import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMultimap; + +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; @@ -50,6 +57,8 @@ import java.net.ConnectException; * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since May 2019 */ + +@Disabled class MessageRouterSubscriberImplTest { private static final String ERROR_MESSAGE = "Something went wrong"; @@ -57,8 +66,7 @@ class MessageRouterSubscriberImplTest { private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class); private final MessageRouterSubscriberConfig clientConfig = MessageRouterSubscriberConfig.createDefault(); private final MessageRouterSubscriber - cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter); - + cut; private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); private final MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() .name("sample topic") @@ -96,7 +104,10 @@ class MessageRouterSubscriberImplTest { .rawBody("{}".getBytes()) .headers(HashMultimap.withSeq().empty()) .build(); - + private MessageRouterSubscriberImplTest() throws Exception{ + cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter); + } + @Test void getWithProperRequest_shouldReturnCorrectResponse() { // given |