aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java141
1 files changed, 141 insertions, 0 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
new file mode 100644
index 00000000..52946f56
--- /dev/null
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
@@ -0,0 +1,141 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import io.vavr.collection.List;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import reactor.core.publisher.Flux;
+
+
+public final class MessageRouterTestsUtils {
+ private static final JsonParser parser = new JsonParser();
+ private MessageRouterTestsUtils() {}
+
+ public static MessageRouterPublishRequest createPublishRequest(String topicUrl){
+ return createPublishRequest(topicUrl, ContentType.APPLICATION_JSON);
+ }
+
+ public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType){
+ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .contentType(contentType)
+ .build();
+ }
+
+ public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
+ String consumerGroup, String consumerId) {
+ ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+
+ return ImmutableMessageRouterSubscribeRequest
+ .builder()
+ .sourceDefinition(sourceDefinition)
+ .consumerGroup(consumerGroup)
+ .consumerId(consumerId)
+ .build();
+ }
+
+ public static List<JsonElement> getAsJsonElements(List<String> messages){
+ return messages.map(parser::parse);
+ }
+
+ public static List<JsonObject> getAsJsonObjects(List<String> messages){
+ return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
+ }
+
+ public static List<JsonPrimitive> getAsJsonPrimitives(List<String> messages){
+ return getAsJsonElements(messages).map(JsonElement::getAsJsonPrimitive);
+ }
+
+ public static JsonObject getAsJsonObject(String item){
+ return new Gson().fromJson(item, JsonObject.class);
+ }
+
+ public static Flux<JsonElement> plainBatch(List<String> messages){
+ return Flux.fromIterable(getAsJsonElements(messages));
+ }
+
+ public static Flux<JsonObject> jsonBatch(List<String> messages){
+ return Flux.fromIterable(getAsJsonObjects(messages));
+ }
+
+ public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){
+ return ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .failReason(String.format(failReasonFormat, formatArgs))
+ .build();
+ }
+
+ public static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){
+ return ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .items(items)
+ .build();
+ }
+
+ public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .failReason(String.format(failReasonFormat, formatArgs))
+ .build();
+ }
+
+ public static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .items(items)
+ .build();
+ }
+
+ public static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest,
+ MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) {
+ final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> jsonMessageBatch = MessageRouterTestsUtils.jsonBatch(sampleJsonMessages);
+
+ publisher.put(publishRequest, jsonMessageBatch).blockLast();
+ subscriber.get(subscribeRequest).block();
+ }
+}