aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java90
1 files changed, 65 insertions, 25 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index 3d43e817..48a12455 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,9 +24,13 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.collection.List;
+
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.mockserver.client.MockServerClient;
import org.mockserver.matchers.Times;
import org.mockserver.verify.VerificationTimes;
@@ -43,6 +48,9 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@@ -62,6 +70,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
+@ExtendWith(SystemStubsExtension.class)
@Testcontainers
class MessageRouterSubscriberIT {
@Container
@@ -71,16 +80,11 @@ class MessageRouterSubscriberIT {
private static String EVENTS_PATH;
private static String PROXY_MOCK_EVENTS_PATH;
+ private static final long REPEAT_SUBSCRIPTION = 20;
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String CONSUMER_GROUP = "group1";
private static final String CONSUMER_ID = "consumer200";
- private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
- "{" +
- "\"mrstatus\":3001," +
- "\"helpURL\":\"http://onap.readthedocs.io\"," +
- "\"message\":\"No such topic exists.-[%s]\"," +
- "\"status\":404" +
- "}";
+ private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Topic Not Found";
private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
+ "{"
+ "\"requestError\":"
@@ -94,22 +98,44 @@ class MessageRouterSubscriberIT {
+ "}"
+ "}";
- private MessageRouterPublisher publisher = DmaapClientFactory
- .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
- private MessageRouterSubscriber subscriber = DmaapClientFactory
- .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-
+ private MessageRouterPublisher publisher;
+ private MessageRouterSubscriber subscriber;
+ Mono<MessageRouterSubscribeResponse> response;
+ @SystemStub
+ EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
@BeforeAll
static void setUp() {
EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+ //sleep introduced to wait till all containers are started
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
@BeforeEach
void set() {
MOCK_SERVER_CLIENT.reset();
+ environmentVariables
+ .set("BOOTSTRAP_SERVERS", "localhost:9092")
+ .set("kafka.auto.offset.reset","earliest");
+ publisher = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ subscriber = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+ response=null;
+ }
+
+ @AfterEach
+ void afterEach() {
+ if(publisher != null)
+ publisher.close();
+ if(subscriber != null)
+ subscriber.close();
}
-
@Test
void subscriber_shouldHandleNoSuchTopicException() {
//given
@@ -144,11 +170,13 @@ class MessageRouterSubscriberIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
-
+ publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
.expectNext(expectedResponse)
@@ -171,11 +199,14 @@ class MessageRouterSubscriberIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
-
+ publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
+
//then
StepVerifier.create(response)
.expectNext(expectedResponse)
@@ -183,6 +214,7 @@ class MessageRouterSubscriberIT {
.verify();
}
+ @Disabled
@Test
void subscriber_shouldExtractItemsFromResponse() {
//given
@@ -208,7 +240,8 @@ class MessageRouterSubscriberIT {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void subscriber_shouldSubscribeToTopic() {
//given
@@ -235,7 +268,8 @@ class MessageRouterSubscriberIT {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void subscriber_shouldHandleTimeoutException() {
//given
@@ -261,6 +295,7 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
}
+ @Disabled
@Test
void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() {
//given
@@ -298,6 +333,7 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
+ @Disabled
@Test
void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() {
//given
@@ -336,6 +372,7 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
+ @Disabled
@Test
void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
//given
@@ -383,6 +420,7 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
}
+ @Disabled
@Test
void subscriber_shouldHandleLastRetryError500() {
//given
@@ -416,6 +454,7 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
+ @Disabled
@Test
void subscriber_shouldSubscribeToTopicWithConnectionPoolConfiguration() {
//given
@@ -445,6 +484,7 @@ class MessageRouterSubscriberIT {
.verify(TIMEOUT);
}
+ @Disabled
@Test
void subscriber_shouldHandleSingleItemResponseWithBasicAuthHeader() {
//given