aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java52
1 files changed, 23 insertions, 29 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
index 1f0fdafd..32b77a1d 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
@@ -21,16 +21,13 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.Gson;
-import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import io.vavr.collection.List;
import java.io.File;
import java.net.URL;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
@@ -53,7 +50,6 @@ import reactor.test.StepVerifier;
@Testcontainers
class MessageRouterSubscriberCIT {
- private static final Gson gson = new Gson();
private static final JsonParser parser = new JsonParser();
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
@@ -120,9 +116,9 @@ class MessageRouterSubscriberCIT {
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> singleJsonMessage = Arrays.asList("{\"message\":\"message1\"}");
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = singleJsonMessage.map(parser::parse);
final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
- final JsonArray expectedItems = getAsJsonArray(singleJsonMessage);
final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
.builder()
.items(expectedItems)
@@ -148,13 +144,13 @@ class MessageRouterSubscriberCIT {
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedElements = twoJsonMessages.map(parser::parse);
final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
- final JsonArray expectedItems = getAsJsonArray(twoJsonMessages);
final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
.builder()
- .items(expectedItems)
+ .items(expectedElements)
.build();
//when
@@ -177,19 +173,19 @@ class MessageRouterSubscriberCIT {
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
//when
registerTopic(publishRequest, subscribeRequest);
- final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
- .thenMany(subscriber.getElements(subscribeRequest).map(JsonElement::getAsString));
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.getElements(subscribeRequest));
//then
StepVerifier.create(result)
- .expectNext(twoJsonMessages.get(0))
- .expectNext(twoJsonMessages.get(1))
+ .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
+ .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
.expectComplete()
.verify(TIMEOUT);
}
@@ -201,20 +197,20 @@ class MessageRouterSubscriberCIT {
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> messages = twoJsonMessages.map(parser::parse);
final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
//when
registerTopic(publishRequest, subscribeRequest);
- final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
- .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1))
- .map(JsonElement::getAsString));
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
//then
StepVerifier.create(result.take(2))
- .expectNext(twoJsonMessages.get(0))
- .expectNext(twoJsonMessages.get(1))
+ .expectNext(messages.get(0))
+ .expectNext(messages.get(1))
.expectComplete()
.verify(TIMEOUT);
}
@@ -255,21 +251,19 @@ class MessageRouterSubscriberCIT {
private void registerTopic(MessageRouterPublishRequest publishRequest,
MessageRouterSubscribeRequest subscribeRequest) {
- final List<String> sampleJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
- final Flux<JsonObject> jsonMessageBatch = Flux.fromIterable(sampleJsonMessages)
- .map(parser::parse).map(JsonElement::getAsJsonObject);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(sampleJsonMessages);
publisher.put(publishRequest, jsonMessageBatch).blockLast();
subscriber.get(subscribeRequest).block();
}
- private JsonArray getAsJsonArray(List<String> list) {
- String listsJsonString = gson.toJson(list);
- return new JsonParser().parse(listsJsonString).getAsJsonArray();
- }
-
private static Flux<JsonObject> jsonBatch(List<String> messages){
return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
}
+
+ private JsonObject getAsJsonObject(String item){
+ return new Gson().fromJson(item, JsonObject.class);
+ }
}