diff options
author | sushant53 <sushant.jadhav@t-systems.com> | 2023-08-11 19:45:44 +0530 |
---|---|---|
committer | Sushant Jadhav <sushant.jadhav@t-systems.com> | 2023-09-11 12:09:33 +0000 |
commit | 86513b7ca5b8cc8ba93bf23176aeac57656b7c66 (patch) | |
tree | 0b1a0499dbccbb937c8eca7b2cef075ad63134df /rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java | |
parent | 9d8a9326758a162eb26236a1dd9de1c29c504554 (diff) |
[DCAEGEN2] Use kafka API directly in DMaaP library
Use kafka API directly in dmaap-client library instead of the DMaaP Rest APIs.
Issue-ID: DCAEGEN2-3364
Change-Id: I7f27d9d5f443fe3934896fa01f907b6001898495
Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java')
-rw-r--r-- | rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java | 94 |
1 files changed, 85 insertions, 9 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java index 97fd26f5..816021bb 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,13 +24,24 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Ignore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterPublisherImpl; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; @@ -40,10 +52,17 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.time.Duration; +import java.util.concurrent.Future; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay; @@ -52,6 +71,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since May 2019 */ +@ExtendWith(SystemStubsExtension.class) class MessageRouterPublisherTest { private static final String ERROR_MESSAGE = "Something went wrong"; @@ -71,9 +91,9 @@ class MessageRouterPublisherTest { private static final List<String> messageBatchItems = List.of("ala", "ma", "kota"); private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); private static final DummyHttpServer SERVER = initialize(); - private MessageRouterPublisher sut = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - + private MessageRouterPublisher sut; + MockProducer<String, String> mockProducer = + new MockProducer<>(true, new StringSerializer(), new StringSerializer()); private static DummyHttpServer initialize() { return DummyHttpServer.start(routes -> routes .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK"))) @@ -86,7 +106,25 @@ class MessageRouterPublisherTest { .post(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE)) ); } - + + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + @BeforeEach + void setUp() { + environmentVariables + .set("BOOTSTRAP_SERVERS", "localhost:9092") + .set("JAAS_CONFIG", "jaas.config"); + + sut = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + sut.setKafkaProducer(mockProducer); + } + @AfterEach + void afterEach() { + sut.close(); + } + @Test void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given @@ -102,7 +140,45 @@ class MessageRouterPublisherTest { .expectComplete() .verify(TIMEOUT); } + + @Test + void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch_ForConstructorWithoutDMaapParameters() throws Exception { + //given + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER); + final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new); + sut = new MessageRouterPublisherImpl(); + sut.setKafkaProducer(mockProducer); + //when + final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); + //then + StepVerifier.create(result) + .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build()) + .expectComplete() + .verify(TIMEOUT); + } + @Test + void publisher_shouldHandleError() { + + sut.setKafkaProducer(mockProducer); + + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER); + + //when + final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); + RuntimeException e = new RuntimeException(); + mockProducer.errorNext(e); + Future<RecordMetadata> record =MessageRouterPublisherImpl.getFuture(); + try{ + record.get(); + }catch(Exception ex) { + assertEquals(e, ex); + } + assertTrue(record.isDone()); + + } + + @Disabled @ParameterizedTest @CsvSource({ FAILING_WITH_400_RESP_PATH + "," + "400 Bad Request", @@ -126,7 +202,8 @@ class MessageRouterPublisherTest { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void publisher_shouldHandleClientTimeoutError() { //given @@ -142,7 +219,8 @@ class MessageRouterPublisherTest { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void publisher_shouldHandleConnectionError() { //given @@ -179,9 +257,7 @@ class MessageRouterPublisherTest { private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) { return ImmutableMessageRouterSink.builder() .name("the topic") - .topicUrl(String.format("http://%s:%d%s", - dummyHttpServer.host(), - dummyHttpServer.port(), + .topicUrl(String.format("http://dmaap-mr%s", topicPath) ) .build(); |