summaryrefslogtreecommitdiffstats
path: root/rest-services
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services')
-rw-r--r--rest-services/aai-client/pom.xml10
-rw-r--r--rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/config/AaiClientConfiguration.java36
-rw-r--r--rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Request.java28
-rw-r--r--rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Transaction.java49
-rw-r--r--rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Transactions.java52
-rw-r--r--rest-services/aai-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/AaiClientConfigurations.java4
-rw-r--r--rest-services/aai-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/TransactionsTest.java67
-rw-r--r--rest-services/aai-client/src/test/resources/transaction.json11
-rw-r--r--rest-services/cbs-client/pom.xml4
-rw-r--r--rest-services/dmaap-client/pom.xml4
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java13
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java8
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java50
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java207
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java275
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java325
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java125
-rw-r--r--rest-services/http-client/pom.xml4
-rw-r--r--rest-services/pom.xml2
19 files changed, 650 insertions, 624 deletions
diff --git a/rest-services/aai-client/pom.xml b/rest-services/aai-client/pom.xml
index 7c63f4b0..2b51227e 100644
--- a/rest-services/aai-client/pom.xml
+++ b/rest-services/aai-client/pom.xml
@@ -28,7 +28,10 @@
<artifactId>ssl</artifactId>
<version>${project.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.immutables</groupId>
+ <artifactId>value</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
@@ -49,5 +52,10 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/config/AaiClientConfiguration.java b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/config/AaiClientConfiguration.java
index 00d323cc..ff74f9b7 100644
--- a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/config/AaiClientConfiguration.java
+++ b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/config/AaiClientConfiguration.java
@@ -20,22 +20,35 @@
package org.onap.dcaegen2.services.sdk.rest.services.aai.client.config;
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
-
import java.io.Serializable;
import java.util.Map;
-
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
@Value.Immutable(prehash = true)
@Value.Style(builder = "new")
@Gson.TypeAdapters
public abstract class AaiClientConfiguration implements Serializable {
+ private static final String PNF_PATH = "/network/pnfs/pnf";
+ private static final String SERVICE_INSTANCE_PATH = "/business/customers/customer/${customer}/service-subscriptions/service-subscription/${serviceType}/service-instances/service-instance/${serviceInstanceId}";
+
private static final long serialVersionUID = 1L;
@Value.Parameter
- public abstract String pnfUrl();
+ @Value.Default
+ public String baseUrl() {
+ return "";
+ }
+
+ /**
+ * Please use baseUrl() instead
+ */
+ @Deprecated
+ @Value.Default
+ public String pnfUrl() {
+ return baseUrl() + PNF_PATH;
+ }
@Value.Parameter
public abstract String aaiUserName();
@@ -46,8 +59,14 @@ public abstract class AaiClientConfiguration implements Serializable {
@Value.Parameter
public abstract Boolean aaiIgnoreSslCertificateErrors();
- @Value.Parameter
- public abstract String aaiServiceInstancePath();
+ /**
+ * Please use baseUrl() instead
+ */
+ @Deprecated
+ @Value.Default
+ public String aaiServiceInstancePath() {
+ return SERVICE_INSTANCE_PATH;
+ }
@Value.Parameter
public abstract Map<String, String> aaiHeaders();
@@ -66,5 +85,4 @@ public abstract class AaiClientConfiguration implements Serializable {
@Value.Parameter
public abstract Boolean enableAaiCertAuth();
-
-}
+} \ No newline at end of file
diff --git a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Request.java b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Request.java
new file mode 100644
index 00000000..cbe3b205
--- /dev/null
+++ b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Request.java
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http;
+
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
+
+public interface Request {
+ HttpMethod method();
+ String uri();
+} \ No newline at end of file
diff --git a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Transaction.java b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Transaction.java
new file mode 100644
index 00000000..902282be
--- /dev/null
+++ b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Transaction.java
@@ -0,0 +1,49 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http;
+
+import com.google.gson.JsonObject;
+import com.google.gson.annotations.SerializedName;
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+/**
+ * @see Transactions
+ */
+@Value.Immutable
+@Gson.TypeAdapters(fieldNamingStrategy = true)
+public interface Transaction {
+
+ enum Action {
+ @SerializedName("put") PUT,
+ @SerializedName("patch") PATCH,
+ @SerializedName("delete") DELETE
+ }
+
+ Action action();
+
+ String uri();
+
+ @Value.Default
+ default JsonObject body() {
+ return new JsonObject();
+ }
+} \ No newline at end of file
diff --git a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Transactions.java b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Transactions.java
new file mode 100644
index 00000000..a21c5617
--- /dev/null
+++ b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/Transactions.java
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod.POST;
+
+import com.google.gson.annotations.SerializedName;
+import java.util.List;
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
+
+/**
+ * @see Transaction
+ * @see <a href="https://docs.onap.org/en/casablanca/submodules/aai/aai-common.git/docs/AAI%20REST%20API%20Documentation/bulkApi.html">AAI
+ * Bulk API</a>
+ */
+@Value.Immutable
+@Gson.TypeAdapters(fieldNamingStrategy = true)
+public interface Transactions extends Request {
+
+ String BULK_SINGLE_TRANSACTION = "/bulk/single-transaction";
+
+ @SerializedName("operations")
+ List<Transaction> operations();
+
+ default HttpMethod method() {
+ return POST;
+ }
+
+ default String uri() {
+ return BULK_SINGLE_TRANSACTION;
+ }
+} \ No newline at end of file
diff --git a/rest-services/aai-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/AaiClientConfigurations.java b/rest-services/aai-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/AaiClientConfigurations.java
index d56348c8..23bfb171 100644
--- a/rest-services/aai-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/AaiClientConfigurations.java
+++ b/rest-services/aai-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/AaiClientConfigurations.java
@@ -43,7 +43,7 @@ public final class AaiClientConfigurations {
private static AaiClientConfiguration validConfiguration(Map<String, String> headers, boolean secure) {
return new ImmutableAaiClientConfiguration.Builder()
- .pnfUrl("some-url")
+ .baseUrl("https://aai.onap.svc.cluster.local:8443/aai/v12")
.aaiUserName("sample-username")
.aaiUserPassword("sample-password")
.aaiIgnoreSslCertificateErrors(false)
@@ -56,4 +56,4 @@ public final class AaiClientConfigurations {
.aaiServiceInstancePath("sample-instance-path")
.build();
}
-}
+} \ No newline at end of file
diff --git a/rest-services/aai-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/TransactionsTest.java b/rest-services/aai-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/TransactionsTest.java
new file mode 100644
index 00000000..70d3653b
--- /dev/null
+++ b/rest-services/aai-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/TransactionsTest.java
@@ -0,0 +1,67 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.Transaction.Action.PUT;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.junit.jupiter.api.Test;
+
+class TransactionsTest {
+
+ @Test
+ void shouldBuildTransactionRequest() throws IOException {
+ // given
+ JsonObject payload = new JsonObject();
+ payload.addProperty("link-name", "foo");
+
+ ImmutableTransactions transactions = ImmutableTransactions
+ .builder()
+ .addOperations(ImmutableTransaction.builder()
+ .action(PUT)
+ .uri("/network/logical-links/logical-link/foo")
+ .body(payload)
+ .build())
+ .build();
+
+ // when
+ Gson gson = new Gson().newBuilder().create();
+ String transaction = gson.toJson(transactions);
+
+ // then
+ String expectedJson = getJsonFromFile("transaction.json");
+ assertThat(transaction).isEqualToIgnoringWhitespace(expectedJson);
+ }
+
+ private String getJsonFromFile(String file) throws IOException {
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(
+ Objects.requireNonNull(TransactionsTest.class.getClassLoader().getResourceAsStream(file))))) {
+ return br.lines().collect(Collectors.joining(System.lineSeparator()));
+ }
+ }
+} \ No newline at end of file
diff --git a/rest-services/aai-client/src/test/resources/transaction.json b/rest-services/aai-client/src/test/resources/transaction.json
new file mode 100644
index 00000000..5cbb5621
--- /dev/null
+++ b/rest-services/aai-client/src/test/resources/transaction.json
@@ -0,0 +1,11 @@
+{
+ "operations": [
+ {
+ "action": "put",
+ "uri": "/network/logical-links/logical-link/foo",
+ "body": {
+ "link-name": "foo"
+ }
+ }
+ ]
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/pom.xml b/rest-services/cbs-client/pom.xml
index 7d044dce..538950fa 100644
--- a/rest-services/cbs-client/pom.xml
+++ b/rest-services/cbs-client/pom.xml
@@ -28,6 +28,10 @@
<artifactId>model</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.immutables</groupId>
+ <artifactId>value</artifactId>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
diff --git a/rest-services/dmaap-client/pom.xml b/rest-services/dmaap-client/pom.xml
index 86a4ab07..a8ee7391 100644
--- a/rest-services/dmaap-client/pom.xml
+++ b/rest-services/dmaap-client/pom.xml
@@ -46,6 +46,10 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.immutables</groupId>
+ <artifactId>value</artifactId>
+ </dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
index 2f2e4214..1edaf72f 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
@@ -24,6 +24,9 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Com
import com.google.gson.Gson;
import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import io.vavr.collection.List;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
@@ -72,10 +75,18 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
final ImmutableMessageRouterSubscribeResponse.Builder builder =
ImmutableMessageRouterSubscribeResponse.builder();
return httpResponse.successful()
- ? builder.items(httpResponse.bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class)).build()
+ ? builder.items(getAsJsonElements(httpResponse)).build()
: builder.failReason(extractFailReason(httpResponse)).build();
}
+ private List<JsonElement> getAsJsonElements(HttpResponse httpResponse){
+ JsonParser parser = new JsonParser();
+
+ JsonArray bodyAsJsonArray = httpResponse
+ .bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class);
+
+ return List.ofAll(bodyAsJsonArray).map(arrayElement -> parser.parse(arrayElement.getAsString()));
+ }
private String buildSubscribeUrl(MessageRouterSubscribeRequest request) {
return String.format("%s/%s/%s", request.sourceDefinition().topicUrl(), request.consumerGroup(),
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java
index 3680ca60..3dd49cb3 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java
@@ -21,9 +21,9 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
-import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import io.vavr.collection.List;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -33,11 +33,11 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
public interface MessageRouterSubscribeResponse extends DmaapResponse {
@Value.Default
- default JsonArray items() { return new JsonArray(); }
+ default List<JsonElement> items() { return List.empty();}
@Value.Derived
default boolean hasElements() {
- return items().size() > 0;
+ return !items().isEmpty();
}
@Value.Derived
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
new file mode 100644
index 00000000..7e6b0d48
--- /dev/null
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+
+import java.io.File;
+import java.net.URL;
+import org.testcontainers.containers.DockerComposeContainer;
+
+final class DMaapContainer {
+ private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
+ private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
+ MR_COMPOSE_RESOURCE_NAME);
+ static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
+ static final String DMAAP_SERVICE_NAME = "dmaap";
+
+ private DMaapContainer() {}
+
+ static DockerComposeContainer createContainerInstance(){
+ return new DockerComposeContainer(
+ new File(DOCKER_COMPOSE_FILE_PATH))
+ .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
+ }
+
+ private static String getDockerComposeFilePath(String resourceName){
+ URL resource = DMaapContainer.class.getClassLoader()
+ .getResource(resourceName);
+
+ if(resource != null) return resource.getFile();
+ else throw new DockerComposeNotFoundException(String
+ .format("File %s does not exist", resourceName));
+ }
+}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
index 9fbd63c8..c746bfec 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
@@ -20,97 +20,68 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since May 2019
- */
+@Testcontainers
class MessageRouterPublisherIT {
-
- private static final String ERROR_MESSAGE = "Something went wrong";
- private static final String TEXT_PLAIN_CONTENT_TYPE = "text/plain";
- private static final String JSON_CONTENT_TYPE = "application/json";
- private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
- private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
- private static final String FAILING_WITH_401_RESP_PATH = "/events/TOPIC401";
- private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403";
- private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404";
- private static final String FAILING_WITH_500_TOPIC_PATH = "/events/TOPIC500";
+ @Container
+ private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
private static final Duration TIMEOUT = Duration.ofSeconds(10);
- private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
- .map(JsonPrimitive::new);
- private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
-
- private static DummyHttpServer server;
- private MessageRouterPublisher sut = DmaapClientFactory
+ private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
+ + "{"
+ + "\"mrstatus\":5007,"
+ + "\"helpURL\":\"http://onap.readthedocs.io\","
+ + "\"message\":\"Error while publishing data to topic.:%s."
+ + "Successfully published number of messages :0."
+ + "Expected { to start an object.\",\"status\":400"
+ + "}";
+ private static String EVENTS_PATH;
+ private final MessageRouterPublisher publisher = DmaapClientFactory
.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-
+ private MessageRouterSubscriber subscriber = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
@BeforeAll
static void setUp() {
- server = DummyHttpServer.start(routes ->
- routes.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
- .post(FAILING_WITH_400_RESP_PATH, (req, resp) ->
- sendError(resp, 400, ERROR_MESSAGE))
- .post(FAILING_WITH_401_RESP_PATH, (req, resp) ->
- sendError(resp, 401, ERROR_MESSAGE))
- .post(FAILING_WITH_403_RESP_PATH, (req, resp) ->
- sendError(resp, 403, ERROR_MESSAGE))
- .post(FAILING_WITH_404_RESP_PATH, (req, resp) ->
- sendError(resp, 404, ERROR_MESSAGE))
- .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) ->
- sendError(resp, 500, ERROR_MESSAGE))
- );
+ EVENTS_PATH = String.format("http://%s:%d/events",
+ CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
+ CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
}
@Test
void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
//given
- final MessageRouterPublishRequest mrRequest = createMRRequest(SUCCESS_RESP_TOPIC_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
-
+ final String topic = "TOPIC";
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
+ final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
//when
- final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
-
- //then
- StepVerifier.create(result)
- .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build())
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void publisher_shouldHandleBadRequestError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_400_RESP_PATH,
- JSON_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "400 Bad Request\n%s", ERROR_MESSAGE);
-
- //when
- final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+ final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
//then
StepVerifier.create(result)
@@ -120,34 +91,17 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldHandleUnauthorizedError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_401_RESP_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "401 Unauthorized\n%s", ERROR_MESSAGE);
-
- //when
- final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
-
- //then
- StepVerifier.create(result)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void publisher_shouldHandleForbiddenError(){
+ void publisher_shouldHandleBadRequestError(){
//given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_403_RESP_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "403 Forbidden\n%s", ERROR_MESSAGE);
+ final String topic = "TOPIC2";
+ final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
+ final Flux<JsonPrimitive> messageBatch = plainBatch(threePlainTextMessages);
+ final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
+ final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
+ DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
//when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
+ final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
//then
StepVerifier.create(result)
@@ -157,64 +111,51 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldHandleNotFoundError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_404_RESP_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "404 Not Found\n%s", ERROR_MESSAGE);
+ void publisher_shouldSuccessfullyPublishSingleMessage(){
+ final String topic = "TOPIC3";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
//then
- StepVerifier.create(result)
+ StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
+ .verify();
}
@Test
- void publisher_shouldHandleInternalServerError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_500_TOPIC_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "500 Internal Server Error\n%s", ERROR_MESSAGE);
+ void publisher_shouldSuccessfullyPublishMultipleMessages(){
+ final String topic = "TOPIC4";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
//then
- StepVerifier.create(result)
+ StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
- }
-
-
- private MessageRouterPublishRequest createMRRequest(String topicPath, String contentType){
- final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
- .name("the topic")
- .topicUrl(String.format("http://%s:%d%s",
- server.host(),
- server.port(),
- topicPath)
- )
- .build();
-
- return ImmutableMessageRouterPublishRequest.builder()
- .sinkDefinition(sinkDefinition)
- .contentType(contentType)
- .build();
- }
-
- private MessageRouterPublishResponse createErrorResponse(String failReasonFormat, Object... formatArgs){
- return ImmutableMessageRouterPublishResponse
- .builder()
- .failReason(String.format(failReasonFormat, formatArgs))
- .build();
+ .verify();
}
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
deleted file mode 100644
index 1f0fdafd..00000000
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-
-package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import java.io.File;
-import java.net.URL;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
-import org.testcontainers.containers.DockerComposeContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
-@Testcontainers
-class MessageRouterSubscriberCIT {
- private static final Gson gson = new Gson();
- private static final JsonParser parser = new JsonParser();
- private static final Duration TIMEOUT = Duration.ofSeconds(10);
- private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
- private static final String CONSUMER_GROUP = "group1";
- private static final String CONSUMER_ID = "consumer200";
- private static final String DMAAP_SERVICE_NAME = "dmaap";
- private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
- private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
- MR_COMPOSE_RESOURCE_NAME);
- private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
- "{" +
- "\"mrstatus\":3001," +
- "\"helpURL\":\"http://onap.readthedocs.io\"," +
- "\"message\":\"No such topic exists.-[%s]\"," +
- "\"status\":404" +
- "}";
-
- @Container
- private static final DockerComposeContainer CONTAINER = new DockerComposeContainer(
- new File(DOCKER_COMPOSE_FILE_PATH))
- .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
-
- private static String EVENTS_PATH;
-
- private MessageRouterPublisher publisher = DmaapClientFactory
- .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
- private MessageRouterSubscriber subscriber = DmaapClientFactory
- .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-
-
- @BeforeAll
- static void setUp() {
- EVENTS_PATH = String.format("http://%s:%d/events",
- CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT),
- CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
- }
-
- @Test
- void subscriber_shouldHandleNoSuchTopicException() {
- //given
- final String topic = "newTopic";
- final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic);
- final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
- final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
- .builder()
- .failReason(expectedFailReason)
- .build();
-
- //when
- Mono<MessageRouterSubscribeResponse> response = subscriber
- .get(mrSubscribeRequest);
-
- //then
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriberShouldHandleSingleItemResponse(){
- //given
- final String topic = "TOPIC";
- final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
- final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
- final List<String> singleJsonMessage = Arrays.asList("{\"message\":\"message1\"}");
- final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
- final JsonArray expectedItems = getAsJsonArray(singleJsonMessage);
- final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
- .builder()
- .items(expectedItems)
- .build();
-
- //when
- registerTopic(publishRequest, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
-
- //then
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify();
- }
-
- @Test
- void subscriber_shouldHandleMultipleItemsResponse() {
- //given
- final String topic = "TOPIC2";
- final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
- final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
- "{\"differentMessage\":\"message2\"}");
- final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
- final JsonArray expectedItems = getAsJsonArray(twoJsonMessages);
- final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
- .builder()
- .items(expectedItems)
- .build();
-
- //when
- registerTopic(publishRequest, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
-
- //then
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify();
- }
-
- @Test
- void subscriber_shouldExtractItemsFromResponse() {
- //given
- final String topic = "TOPIC3";
- final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
- final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
- "{\"differentMessage\":\"message2\"}");
- final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
-
- //when
- registerTopic(publishRequest, subscribeRequest);
- final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
- .thenMany(subscriber.getElements(subscribeRequest).map(JsonElement::getAsString));
-
- //then
- StepVerifier.create(result)
- .expectNext(twoJsonMessages.get(0))
- .expectNext(twoJsonMessages.get(1))
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldSubscribeToTopic(){
- //given
- final String topic = "TOPIC4";
- final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
- final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
- "{\"differentMessage\":\"message2\"}");
- final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
-
- //when
- registerTopic(publishRequest, subscribeRequest);
- final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
- .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1))
- .map(JsonElement::getAsString));
-
- //then
- StepVerifier.create(result.take(2))
- .expectNext(twoJsonMessages.get(0))
- .expectNext(twoJsonMessages.get(1))
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- private static String getDockerComposeFilePath(String resourceName){
- URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
- .getResource(resourceName);
-
- if(resource != null) return resource.getFile();
- else throw new DockerComposeNotFoundException(String
- .format("File %s does not exist", resourceName));
- }
-
- private static MessageRouterPublishRequest createMRPublishRequest(String topic){
- MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
- .name("the topic")
- .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
- .build();
-
- return ImmutableMessageRouterPublishRequest.builder()
- .sinkDefinition(sinkDefinition)
- .build();
- }
-
- private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) {
- ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
- .name("the topic")
- .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
- .build();
-
- return ImmutableMessageRouterSubscribeRequest
- .builder()
- .sourceDefinition(sourceDefinition)
- .consumerGroup(CONSUMER_GROUP)
- .consumerId(CONSUMER_ID)
- .build();
- }
-
- private void registerTopic(MessageRouterPublishRequest publishRequest,
- MessageRouterSubscribeRequest subscribeRequest) {
- final List<String> sampleJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
- "{\"differentMessage\":\"message2\"}");
- final Flux<JsonObject> jsonMessageBatch = Flux.fromIterable(sampleJsonMessages)
- .map(parser::parse).map(JsonElement::getAsJsonObject);
-
- publisher.put(publishRequest, jsonMessageBatch).blockLast();
- subscriber.get(subscribeRequest).block();
- }
-
- private JsonArray getAsJsonArray(List<String> list) {
- String listsJsonString = gson.toJson(list);
- return new JsonParser().parse(listsJsonString).getAsJsonArray();
- }
-
- private static Flux<JsonObject> jsonBatch(List<String> messages){
- return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
- }
-}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index a2c000f5..c2e96b58 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -20,104 +20,73 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
-import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.netty.http.server.HttpServerRoutes;
import reactor.test.StepVerifier;
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since May 2019
- */
+@Testcontainers
class MessageRouterSubscriberIT {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
- private static final String ERROR_MESSAGE = "Something went wrong";
private static final String CONSUMER_GROUP = "group1";
- private static final String SUCCESS_CONSUMER_ID = "consumer200";
- private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401";
- private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403";
- private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
- private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429";
- private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500";
-
- private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP);
-
- private static final String SUCCESS_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID);
- private static final String FAILING_WITH_401_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID);
- private static final String FAILING_WITH_403_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_403_CONSUMER_ID);
- private static final String FAILING_WITH_409_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_409_CONSUMER_ID);
- private static final String FAILING_WITH_429_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_429_CONSUMER_ID);
- private static final String FAILING_WITH_500_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID);
-
- private static MessageRouterSubscribeRequest mrSuccessRequest;
- private static MessageRouterSubscribeRequest mrFailingRequest;
- private MessageRouterSubscriber sut = DmaapClientFactory
+ private static final String CONSUMER_ID = "consumer200";
+ private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
+ "{" +
+ "\"mrstatus\":3001," +
+ "\"helpURL\":\"http://onap.readthedocs.io\"," +
+ "\"message\":\"No such topic exists.-[%s]\"," +
+ "\"status\":404" +
+ "}";
+
+ @Container
+ private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
+
+ private static String EVENTS_PATH;
+
+ private MessageRouterPublisher publisher = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ private MessageRouterSubscriber subscriber = DmaapClientFactory
.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
- private static MessageRouterSource sourceDefinition;
@BeforeAll
static void setUp() {
- DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberIT::setRoutes);
-
- sourceDefinition = createMessageRouterSource(server);
-
- mrSuccessRequest = createSuccessRequest();
-
- mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
- }
-
- @Test
- void subscriber_shouldGetCorrectResponse(){
- Mono<MessageRouterSubscribeResponse> response = sut
- .get(mrSuccessRequest);
-
- JsonArray expectedItems = new JsonArray();
- expectedItems.add("I");
- expectedItems.add("like");
- expectedItems.add("pizza");
-
- MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
- .builder()
- .items(expectedItems)
- .build();
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
+ EVENTS_PATH = String.format("http://%s:%d/events",
+ CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
+ CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
}
@Test
- void subscriber_shouldGetUnauthorizedErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("401 Unauthorized\n%s", ERROR_MESSAGE));
-
+ void subscriber_shouldHandleNoSuchTopicException() {
+ //given
+ final String topic = "newTopic";
+ final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
+ String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
+ final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
+ DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
+
+ //when
+ Mono<MessageRouterSubscribeResponse> response = subscriber
+ .get(mrSubscribeRequest);
+
+ //then
StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
@@ -125,151 +94,111 @@ class MessageRouterSubscriberIT {
}
@Test
- void subscriber_shouldGetForbiddenErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("403 Forbidden\n%s", ERROR_MESSAGE));
-
+ void subscriberShouldHandleSingleItemResponse(){
+ //given
+ final String topic = "TOPIC";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
+ .verify();
}
@Test
- void subscriber_shouldGetConflictErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("409 Conflict\n%s", ERROR_MESSAGE));
-
+ void subscriber_shouldHandleMultipleItemsResponse() {
+ //given
+ final String topic = "TOPIC2";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
+ .verify();
}
@Test
- void subscriber_shouldGetTooManyRequestsErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("429 Too Many Requests\n%s", ERROR_MESSAGE));
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldGetInternalServerErrorResponse(){
- Mono<MessageRouterSubscribeResponse> response = sut
- .get(mrFailingRequest);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("500 Internal Server Error\n%s", ERROR_MESSAGE));
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldParseCorrectResponse() {
- final Flux<String> result = sut
- .getElements(mrSuccessRequest)
- .map(JsonElement::getAsString);
-
+ void subscriber_shouldExtractItemsFromResponse() {
+ //given
+ final String topic = "TOPIC3";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
+ CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.getElements(subscribeRequest));
+
+ //then
StepVerifier.create(result)
- .expectNext("I", "like", "pizza")
+ .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
+ .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
.expectComplete()
.verify(TIMEOUT);
}
@Test
- void subscriber_shouldParseErrorResponse(){
- Flux<String> result = sut
- .getElements(mrFailingRequest)
- .map(JsonElement::getAsString);
-
- StepVerifier.create(result)
- .expectError(IllegalStateException.class)
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldSubscribeCorrectly(){
- Flux<String> subscriptionForElements = sut
- .subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1))
- .map(JsonElement:: getAsString);
-
- StepVerifier.create(subscriptionForElements.take(2))
- .expectNext("I", "like")
+ void subscriber_shouldSubscribeToTopic(){
+ //given
+ final String topic = "TOPIC4";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
+ CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
+
+ //then
+ StepVerifier.create(result.take(2))
+ .expectNext(messages.get(0))
+ .expectNext(messages.get(1))
.expectComplete()
.verify(TIMEOUT);
}
- @Test
- void subscriber_shouldParseErrorWhenSubscribed(){
- Flux<String> subscriptionForElements = sut
- .subscribeForElements(mrFailingRequest, Duration.ofSeconds(1))
- .map(JsonElement:: getAsString);
-
- StepVerifier.create(subscriptionForElements.take(2))
- .expectError(IllegalStateException.class)
- .verify(TIMEOUT);
- }
- private static HttpServerRoutes setRoutes(HttpServerRoutes routes){
- return routes
- .get(SUCCESS_RESP_PATH, (req, resp) ->
- sendResource(resp, "/sample-mr-subscribe-response.json"))
- .get(FAILING_WITH_401_RESP_PATH, (req, resp) ->
- sendError(resp, 401, ERROR_MESSAGE))
- .get(FAILING_WITH_403_RESP_PATH, (req, resp) ->
- sendError(resp, 403, ERROR_MESSAGE))
- .get(FAILING_WITH_409_RESP_PATH, (req, resp) ->
- sendError(resp, 409, ERROR_MESSAGE))
- .get(FAILING_WITH_429_RESP_PATH, (req, resp) ->
- sendError(resp, 429, ERROR_MESSAGE))
- .get(FAILING_WITH_500_RESP_PATH, (req, resp) ->
- sendError(resp, 500, ERROR_MESSAGE));
- }
- private static MessageRouterSource createMessageRouterSource(DummyHttpServer server){
- return ImmutableMessageRouterSource.builder()
- .name("the topic")
- .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
- .build();
- }
-
- private static MessageRouterSubscribeRequest createSuccessRequest(){
- return ImmutableMessageRouterSubscribeRequest.builder()
- .sourceDefinition(sourceDefinition)
- .consumerGroup(CONSUMER_GROUP)
- .consumerId(SUCCESS_CONSUMER_ID)
- .build();
- }
-
- private static MessageRouterSubscribeRequest createFailingRequest(String consumerId){
- return ImmutableMessageRouterSubscribeRequest
- .builder()
- .sourceDefinition(sourceDefinition)
- .consumerGroup(CONSUMER_GROUP)
- .consumerId(consumerId)
- .build();
- }
-
- private static MessageRouterSubscribeResponse createErrorResponse(String failReason){
- return ImmutableMessageRouterSubscribeResponse
- .builder()
- .failReason(failReason)
- .build();
- }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java
new file mode 100644
index 00000000..8695b727
--- /dev/null
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java
@@ -0,0 +1,125 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import io.vavr.collection.List;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import reactor.core.publisher.Flux;
+
+final class MessageRouterTestsUtils {
+ private static final JsonParser parser = new JsonParser();
+ private MessageRouterTestsUtils() {}
+
+ static MessageRouterPublishRequest createPublishRequest(String topicUrl){
+ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .build();
+ }
+
+ static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
+ String consumerGroup, String consumerId) {
+ ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+
+ return ImmutableMessageRouterSubscribeRequest
+ .builder()
+ .sourceDefinition(sourceDefinition)
+ .consumerGroup(consumerGroup)
+ .consumerId(consumerId)
+ .build();
+ }
+
+ static List<JsonElement> getAsJsonElements(List<String> messages){
+ return messages.map(parser::parse);
+ }
+
+ static JsonObject getAsJsonObject(String item){
+ return new Gson().fromJson(item, JsonObject.class);
+ }
+
+ static Flux<JsonObject> jsonBatch(List<String> messages){
+ return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
+ }
+
+ static Flux<JsonPrimitive> plainBatch(List<String> messages){
+ return Flux.fromIterable(messages).map(JsonPrimitive::new);
+ }
+
+ static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){
+ return ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .failReason(String.format(failReasonFormat, formatArgs))
+ .build();
+ }
+
+ static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){
+ return ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .items(items)
+ .build();
+ }
+
+ static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .failReason(String.format(failReasonFormat, formatArgs))
+ .build();
+ }
+
+ static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .items(items)
+ .build();
+ }
+
+ static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest,
+ MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) {
+ final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> jsonMessageBatch = MessageRouterTestsUtils.jsonBatch(sampleJsonMessages);
+
+ publisher.put(publishRequest, jsonMessageBatch).blockLast();
+ subscriber.get(subscribeRequest).block();
+ }
+}
diff --git a/rest-services/http-client/pom.xml b/rest-services/http-client/pom.xml
index 1d321058..ba6b2fd0 100644
--- a/rest-services/http-client/pom.xml
+++ b/rest-services/http-client/pom.xml
@@ -58,6 +58,10 @@
<artifactId>vavr</artifactId>
</dependency>
<dependency>
+ <groupId>org.immutables</groupId>
+ <artifactId>value</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
</dependency>
diff --git a/rest-services/pom.xml b/rest-services/pom.xml
index 9721f089..a3b7013c 100644
--- a/rest-services/pom.xml
+++ b/rest-services/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>org.onap.dcaegen2.services</groupId>
<artifactId>sdk</artifactId>
- <version>1.2.0-SNAPSHOT</version>
+ <version>1.3.0-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>