summaryrefslogtreecommitdiffstats
path: root/rest-services
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java13
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java8
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java52
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java10
4 files changed, 43 insertions, 40 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
index 2f2e4214..1edaf72f 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
@@ -24,6 +24,9 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Com
import com.google.gson.Gson;
import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import io.vavr.collection.List;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
@@ -72,10 +75,18 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
final ImmutableMessageRouterSubscribeResponse.Builder builder =
ImmutableMessageRouterSubscribeResponse.builder();
return httpResponse.successful()
- ? builder.items(httpResponse.bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class)).build()
+ ? builder.items(getAsJsonElements(httpResponse)).build()
: builder.failReason(extractFailReason(httpResponse)).build();
}
+ private List<JsonElement> getAsJsonElements(HttpResponse httpResponse){
+ JsonParser parser = new JsonParser();
+
+ JsonArray bodyAsJsonArray = httpResponse
+ .bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class);
+
+ return List.ofAll(bodyAsJsonArray).map(arrayElement -> parser.parse(arrayElement.getAsString()));
+ }
private String buildSubscribeUrl(MessageRouterSubscribeRequest request) {
return String.format("%s/%s/%s", request.sourceDefinition().topicUrl(), request.consumerGroup(),
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java
index 3680ca60..3dd49cb3 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java
@@ -21,9 +21,9 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
-import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import io.vavr.collection.List;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -33,11 +33,11 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
public interface MessageRouterSubscribeResponse extends DmaapResponse {
@Value.Default
- default JsonArray items() { return new JsonArray(); }
+ default List<JsonElement> items() { return List.empty();}
@Value.Derived
default boolean hasElements() {
- return items().size() > 0;
+ return !items().isEmpty();
}
@Value.Derived
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
index 1f0fdafd..32b77a1d 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
@@ -21,16 +21,13 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.Gson;
-import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import io.vavr.collection.List;
import java.io.File;
import java.net.URL;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
@@ -53,7 +50,6 @@ import reactor.test.StepVerifier;
@Testcontainers
class MessageRouterSubscriberCIT {
- private static final Gson gson = new Gson();
private static final JsonParser parser = new JsonParser();
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
@@ -120,9 +116,9 @@ class MessageRouterSubscriberCIT {
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> singleJsonMessage = Arrays.asList("{\"message\":\"message1\"}");
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = singleJsonMessage.map(parser::parse);
final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
- final JsonArray expectedItems = getAsJsonArray(singleJsonMessage);
final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
.builder()
.items(expectedItems)
@@ -148,13 +144,13 @@ class MessageRouterSubscriberCIT {
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedElements = twoJsonMessages.map(parser::parse);
final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
- final JsonArray expectedItems = getAsJsonArray(twoJsonMessages);
final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
.builder()
- .items(expectedItems)
+ .items(expectedElements)
.build();
//when
@@ -177,19 +173,19 @@ class MessageRouterSubscriberCIT {
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
//when
registerTopic(publishRequest, subscribeRequest);
- final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
- .thenMany(subscriber.getElements(subscribeRequest).map(JsonElement::getAsString));
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.getElements(subscribeRequest));
//then
StepVerifier.create(result)
- .expectNext(twoJsonMessages.get(0))
- .expectNext(twoJsonMessages.get(1))
+ .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
+ .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
.expectComplete()
.verify(TIMEOUT);
}
@@ -201,20 +197,20 @@ class MessageRouterSubscriberCIT {
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> messages = twoJsonMessages.map(parser::parse);
final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
//when
registerTopic(publishRequest, subscribeRequest);
- final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
- .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1))
- .map(JsonElement::getAsString));
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
//then
StepVerifier.create(result.take(2))
- .expectNext(twoJsonMessages.get(0))
- .expectNext(twoJsonMessages.get(1))
+ .expectNext(messages.get(0))
+ .expectNext(messages.get(1))
.expectComplete()
.verify(TIMEOUT);
}
@@ -255,21 +251,19 @@ class MessageRouterSubscriberCIT {
private void registerTopic(MessageRouterPublishRequest publishRequest,
MessageRouterSubscribeRequest subscribeRequest) {
- final List<String> sampleJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
- final Flux<JsonObject> jsonMessageBatch = Flux.fromIterable(sampleJsonMessages)
- .map(parser::parse).map(JsonElement::getAsJsonObject);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(sampleJsonMessages);
publisher.put(publishRequest, jsonMessageBatch).blockLast();
subscriber.get(subscribeRequest).block();
}
- private JsonArray getAsJsonArray(List<String> list) {
- String listsJsonString = gson.toJson(list);
- return new JsonParser().parse(listsJsonString).getAsJsonArray();
- }
-
private static Flux<JsonObject> jsonBatch(List<String> messages){
return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
}
+
+ private JsonObject getAsJsonObject(String item){
+ return new Gson().fromJson(item, JsonObject.class);
+ }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index a2c000f5..225d3539 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -23,8 +23,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
-import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
+import com.google.gson.JsonPrimitive;
+import io.vavr.collection.List;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -94,14 +95,11 @@ class MessageRouterSubscriberIT {
Mono<MessageRouterSubscribeResponse> response = sut
.get(mrSuccessRequest);
- JsonArray expectedItems = new JsonArray();
- expectedItems.add("I");
- expectedItems.add("like");
- expectedItems.add("pizza");
+ List<String> expectedItems = List.of("I", "like", "pizza");
MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
.builder()
- .items(expectedItems)
+ .items(expectedItems.map(JsonPrimitive::new))
.build();
StepVerifier.create(response)