aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
diff options
context:
space:
mode:
authorsushant53 <sushant.jadhav@t-systems.com>2023-08-11 19:45:44 +0530
committerSushant Jadhav <sushant.jadhav@t-systems.com>2023-09-11 12:09:33 +0000
commit86513b7ca5b8cc8ba93bf23176aeac57656b7c66 (patch)
tree0b1a0499dbccbb937c8eca7b2cef075ad63134df /rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
parent9d8a9326758a162eb26236a1dd9de1c29c504554 (diff)
[DCAEGEN2] Use kafka API directly in DMaaP library
Use kafka API directly in dmaap-client library instead of the DMaaP Rest APIs. Issue-ID: DCAEGEN2-3364 Change-Id: I7f27d9d5f443fe3934896fa01f907b6001898495 Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java159
1 files changed, 114 insertions, 45 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
index a1ad951f..a806ba19 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,13 +24,22 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.collection.List;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedStatic;
import org.mockserver.client.MockServerClient;
import org.mockserver.matchers.Times;
import org.mockserver.verify.VerificationTimes;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
@@ -45,10 +55,16 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.mockStatic;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
@@ -65,6 +81,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
+@ExtendWith(SystemStubsExtension.class)
@Testcontainers
class MessageRouterPublisherIT {
@Container
@@ -74,6 +91,7 @@ class MessageRouterPublisherIT {
private static String EVENTS_PATH;
private static String PROXY_MOCK_EVENTS_PATH;
+ private static final long REPEAT_SUBSCRIPTION = 20;
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
+ "{"
@@ -108,22 +126,46 @@ class MessageRouterPublisherIT {
+ "}"
+ "}";
- private final MessageRouterPublisher publisher = DmaapClientFactory
- .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
- private final MessageRouterSubscriber subscriber = DmaapClientFactory
- .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-
+ private MessageRouterPublisher publisher;
+ private MessageRouterSubscriber subscriber;
+ Mono<MessageRouterSubscribeResponse> response;
+
+ @SystemStub
+ EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
@BeforeAll
static void setUp() {
EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+ //sleep introduced to wait till all containers are started
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
-
+
+ @AfterEach
+ void afterEach() {
+ publisher.close();
+ subscriber.close();
+ }
+
@BeforeEach
void set() {
MOCK_SERVER_CLIENT.reset();
+ environmentVariables
+ .set("BOOTSTRAP_SERVERS", "localhost:9092")
+ .set("kafka.auto.offset.reset","earliest");
+ publisher = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ subscriber = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+ response=null;
+
}
-
+
+ @Disabled
@Test
void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
//given
@@ -143,7 +185,8 @@ class MessageRouterPublisherIT {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleBadRequestError() {
//given
@@ -175,13 +218,16 @@ class MessageRouterPublisherIT {
final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
-
+
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
-
+ publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
+
//then
StepVerifier.create(response)
.expectNext(expectedResponse)
@@ -200,12 +246,15 @@ class MessageRouterPublisherIT {
final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
-
+
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
+ publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
@@ -229,11 +278,13 @@ class MessageRouterPublisherIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, plainBatch)
- .then(subscriber.get(subscribeRequest));
-
+ publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
.expectNext(expectedResponse)
@@ -256,10 +307,13 @@ class MessageRouterPublisherIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, plainBatch)
- .then(subscriber.get(subscribeRequest));
+ publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
@@ -283,10 +337,13 @@ class MessageRouterPublisherIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, plainBatch)
- .then(subscriber.get(subscribeRequest));
+ publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
@@ -310,10 +367,13 @@ class MessageRouterPublisherIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, plainBatch)
- .then(subscriber.get(subscribeRequest));
+ publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
@@ -321,7 +381,8 @@ class MessageRouterPublisherIT {
.expectComplete()
.verify();
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
//given
@@ -347,7 +408,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
}
-
+
+ @Disabled
@Test
void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
//given
@@ -380,7 +442,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
-
+
+ @Disabled
@Test
void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
//given
@@ -412,7 +475,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
-
+
+ @Disabled
@Test
void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
//given
@@ -453,7 +517,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleLastRetryError500() {
//given
@@ -489,7 +554,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
-
+
+ @Disabled
@Test
void publisher_shouldSuccessfullyPublishWhenConnectionPoolConfigurationIsSet() {
//given
@@ -521,7 +587,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(1));
}
-
+
+ @Disabled
@Test
void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublishWithConnectionPoolConfiguration() {
//given
@@ -553,7 +620,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(2));
}
-
+
+ @Disabled
@Test
void publisher_shouldSuccessfullyPublishSingleMessageWithBasicAuthHeader() {
//given
@@ -581,7 +649,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path)
.withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1));
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleError429WhenConnectionPollLimitsHasBeenReached() {
//given