aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
diff options
context:
space:
mode:
authorsushant53 <sushant.jadhav@t-systems.com>2023-08-11 19:45:44 +0530
committerSushant Jadhav <sushant.jadhav@t-systems.com>2023-09-11 12:09:33 +0000
commit86513b7ca5b8cc8ba93bf23176aeac57656b7c66 (patch)
tree0b1a0499dbccbb937c8eca7b2cef075ad63134df /rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
parent9d8a9326758a162eb26236a1dd9de1c29c504554 (diff)
[DCAEGEN2] Use kafka API directly in DMaaP library
Use kafka API directly in dmaap-client library instead of the DMaaP Rest APIs. Issue-ID: DCAEGEN2-3364 Change-Id: I7f27d9d5f443fe3934896fa01f907b6001898495 Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java94
1 files changed, 85 insertions, 9 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
index 97fd26f5..816021bb 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,13 +24,24 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Ignore;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterPublisherImpl;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
@@ -40,10 +52,17 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
import java.time.Duration;
+import java.util.concurrent.Future;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay;
@@ -52,6 +71,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since May 2019
*/
+@ExtendWith(SystemStubsExtension.class)
class MessageRouterPublisherTest {
private static final String ERROR_MESSAGE = "Something went wrong";
@@ -71,9 +91,9 @@ class MessageRouterPublisherTest {
private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
private static final DummyHttpServer SERVER = initialize();
- private MessageRouterPublisher sut = DmaapClientFactory
- .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-
+ private MessageRouterPublisher sut;
+ MockProducer<String, String> mockProducer =
+ new MockProducer<>(true, new StringSerializer(), new StringSerializer());
private static DummyHttpServer initialize() {
return DummyHttpServer.start(routes -> routes
.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
@@ -86,7 +106,25 @@ class MessageRouterPublisherTest {
.post(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
);
}
-
+
+ @SystemStub
+ EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
+ @BeforeEach
+ void setUp() {
+ environmentVariables
+ .set("BOOTSTRAP_SERVERS", "localhost:9092")
+ .set("JAAS_CONFIG", "jaas.config");
+
+ sut = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ sut.setKafkaProducer(mockProducer);
+ }
+ @AfterEach
+ void afterEach() {
+ sut.close();
+ }
+
@Test
void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
//given
@@ -102,7 +140,45 @@ class MessageRouterPublisherTest {
.expectComplete()
.verify(TIMEOUT);
}
+
+ @Test
+ void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch_ForConstructorWithoutDMaapParameters() throws Exception {
+ //given
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER);
+ final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
+ sut = new MessageRouterPublisherImpl();
+ sut.setKafkaProducer(mockProducer);
+ //when
+ final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+ //then
+ StepVerifier.create(result)
+ .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build())
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+ @Test
+ void publisher_shouldHandleError() {
+
+ sut.setKafkaProducer(mockProducer);
+
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER);
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+ RuntimeException e = new RuntimeException();
+ mockProducer.errorNext(e);
+ Future<RecordMetadata> record =MessageRouterPublisherImpl.getFuture();
+ try{
+ record.get();
+ }catch(Exception ex) {
+ assertEquals(e, ex);
+ }
+ assertTrue(record.isDone());
+
+ }
+
+ @Disabled
@ParameterizedTest
@CsvSource({
FAILING_WITH_400_RESP_PATH + "," + "400 Bad Request",
@@ -126,7 +202,8 @@ class MessageRouterPublisherTest {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleClientTimeoutError() {
//given
@@ -142,7 +219,8 @@ class MessageRouterPublisherTest {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleConnectionError() {
//given
@@ -179,9 +257,7 @@ class MessageRouterPublisherTest {
private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) {
return ImmutableMessageRouterSink.builder()
.name("the topic")
- .topicUrl(String.format("http://%s:%d%s",
- dummyHttpServer.host(),
- dummyHttpServer.port(),
+ .topicUrl(String.format("http://dmaap-mr%s",
topicPath)
)
.build();