summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-25 15:32:42 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-26 12:55:04 +0100
commit015668f4a24fcc497151c6142a0bf70717c55f8e (patch)
tree6af75819471a378f203d4849844d2cfb384df444
parent51d3ae2b08dd49029cd9c86bfe8d95d1eef14326 (diff)
Add streams parsing integration tests
Change-Id: I22410b3fb110e47bde123556951bb12af5f34a1c Issue-ID: DCAEGEN2-1315 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
-rw-r--r--rest-services/cbs-client/pom.xml9
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java3
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java60
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java13
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java44
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java12
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java12
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java16
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java16
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java13
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java13
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java4
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java151
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java204
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java23
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java103
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java27
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java29
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java29
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java29
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java7
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java7
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/integration_message_router.json62
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json80
-rw-r--r--rest-services/model/pom.xml80
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/AafCredentials.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java)4
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStream.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java)4
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStreamDirection.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java)2
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/RawDataStream.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java)4
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SinkStream.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java)5
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SourceStream.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java)5
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/StreamType.java52
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouter.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java)4
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSink.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java)6
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSource.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java)6
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/Kafka.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java)6
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSink.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java)6
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSource.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java)6
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouter.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java)6
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSink.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java)6
-rw-r--r--rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSource.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java)6
-rw-r--r--rest-services/pom.xml1
55 files changed, 1000 insertions, 231 deletions
diff --git a/rest-services/cbs-client/pom.xml b/rest-services/cbs-client/pom.xml
index 9544a7fe..34804038 100644
--- a/rest-services/cbs-client/pom.xml
+++ b/rest-services/cbs-client/pom.xml
@@ -24,12 +24,9 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>io.vavr</groupId>
- <artifactId>vavr</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains</groupId>
- <artifactId>annotations</artifactId>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>model</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java
index cbdea005..3e295a0f 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java
@@ -21,13 +21,11 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions;
import io.vavr.control.Either;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.2
*/
-@ExperimentalApi
public class StreamParserError {
private final String message;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java
index ca531e82..4fca3d9a 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java
@@ -20,13 +20,10 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.2
*/
-@ExperimentalApi
public class StreamParsingException extends CbsClientException {
private final StreamParserError cause;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
index 648b7a61..e9263f4f 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
@@ -23,10 +23,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.collection.Stream;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
/**
* Extract streams from the application configuration represented as GSON JsonObject.
@@ -64,7 +63,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Raw
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
public final class DataStreams {
private DataStreams() {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
index a8ce3644..2fd1a49d 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
@@ -21,8 +21,7 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
import com.google.gson.JsonObject;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStream;
/**
* Represents parser taking GSON JsonObject as an input
@@ -30,7 +29,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
public interface StreamFromGsonParser<S extends DataStream> extends StreamParser<JsonObject, S> {
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
index 7476e976..e117a3c1 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
@@ -26,7 +26,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSourceParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.*;
/**
* Factory methods for GSON-based {@code StreamParser}s
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
index 69016ed8..61afbe4f 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
@@ -22,11 +22,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
import io.vavr.control.Either;
import io.vavr.control.Try;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
/**
* A generic data stream parser which parses {@code T} to data stream {@code S}.
@@ -36,7 +35,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Raw
* @param <S> output data type
* @since 1.1.4
*/
-@ExperimentalApi
public interface StreamParser<T, S extends DataStream> {
/**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java
new file mode 100644
index 00000000..dfc6344a
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import java.util.Objects;
+import java.util.function.Predicate;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+
+/**
+ * A small collection of predicates usable when filtering {@link RawDataStream}s.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+public final class StreamPredicates {
+
+ public StreamPredicates() {
+ }
+
+ /**
+ * Predicate for matching {@link RawDataStream} by name.
+ *
+ * @param name data stream name
+ * @param <T> type of data stream
+ * @return a predicate which returns true only when a stream name is equal to the given name
+ */
+ public static <T> Predicate<RawDataStream<T>> streamWithName(String name) {
+ return stream -> Objects.equals(stream.name(), name);
+ }
+
+ /**
+ * Predicate for matching {@link RawDataStream} by type.
+ *
+ * @param type data stream type
+ * @param <T> type of data stream
+ * @return a predicate which returns true only when a stream type is equal to the given type
+ */
+ public static <T> Predicate<RawDataStream<T>> streamOfType(StreamType type) {
+ return stream -> stream.type() == type;
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
index 1148574e..7f3ccf35 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
@@ -25,9 +25,10 @@ import com.google.gson.JsonObject;
import io.vavr.collection.Stream;
import java.io.IOException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -46,9 +47,9 @@ public final class DataStreamUtils {
public static void assertStreamType(
RawDataStream<JsonObject> json,
- String expectedType,
+ StreamType expectedType,
DataStreamDirection expectedDirection) {
- if (!json.type().equals(expectedType)) {
+ if (json.type() != expectedType) {
throw new StreamParsingException(
"Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'");
}
@@ -71,7 +72,7 @@ public final class DataStreamUtils {
return ImmutableRawDataStream.<JsonObject>builder()
.name(name)
.direction(direction)
- .type(GsonUtils.requiredString(json, "type"))
+ .type(StreamType.parse(GsonUtils.requiredString(json, "type")))
.descriptor(json)
.build();
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
index 0fdec5d8..7776a1ef 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
@@ -35,9 +35,9 @@ import java.util.stream.Collectors;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.GsonAdaptersAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.GsonAdaptersDataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.GsonAdaptersDataRouterSource;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java
index a9dd0c4d..68304cae 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java
@@ -25,12 +25,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
public final class StreamsConstants {
- public static final String DATA_ROUTER_TYPE = "data_router";
-
- public static final String MESSAGE_ROUTER_TYPE = "message_router";
-
- public static final String KAFKA_TYPE = "kafka";
-
public static final String DMAAP_INFO_CHILD_NAME = "dmaap_info";
public static final String KAFKA_INFO_CHILD_NAME = "kafka_info";
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java
new file mode 100644
index 00000000..858fd73b
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public class DmaapUtils {
+
+ public static final ImmutableAafCredentials EMPTY_CREDENTIALS = ImmutableAafCredentials.builder().build();
+
+ private DmaapUtils() {
+ }
+
+ public static @Nullable AafCredentials extractAafCredentials(Gson gson, JsonObject input) {
+ final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+ return EMPTY_CREDENTIALS.equals(aafCredentials) ? null : aafCredentials;
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java
index 83ca4cba..4cf7cbec 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java
@@ -22,17 +22,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSink;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -51,7 +51,7 @@ public final class DataRouterSinkParser implements StreamFromGsonParser<DataRout
@Override
public DataRouterSink unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SINK);
+ assertStreamType(input, StreamType.DATA_ROUTER, DataStreamDirection.SINK);
final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class).withName(input.name());
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java
index d8b7109d..a8800711 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java
@@ -22,17 +22,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSource;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -51,7 +51,7 @@ public final class DataRouterSourceParser implements StreamFromGsonParser<DataRo
@Override
public DataRouterSource unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SOURCE);
+ assertStreamType(input, StreamType.DATA_ROUTER, DataStreamDirection.SOURCE);
final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class).withName(input.name());
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java
index c5d254f0..40b8f383 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java
@@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouter;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouter;
import java.util.Objects;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java
index 7122d7c5..650161f7 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java
@@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java
index 49871b11..286c4494 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java
@@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java
index b1f037ab..dc2c2e2d 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java
@@ -23,17 +23,17 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
public final class MessageRouterSinkParser implements StreamFromGsonParser<MessageRouterSink> {
@@ -49,9 +49,9 @@ public final class MessageRouterSinkParser implements StreamFromGsonParser<Messa
@Override
public MessageRouterSink unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SINK);
+ assertStreamType(input, StreamType.MESSAGE_ROUTER, DataStreamDirection.SINK);
- final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
+ final AafCredentials aafCredentials = DmaapUtils.extractAafCredentials(gson, input.descriptor());
final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java
index e6b964d0..148584a6 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java
@@ -23,17 +23,17 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -52,9 +52,9 @@ public final class MessageRouterSourceParser implements StreamFromGsonParser<Mes
@Override
public MessageRouterSource unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SOURCE);
+ assertStreamType(input, StreamType.MESSAGE_ROUTER, DataStreamDirection.SOURCE);
- final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
+ final AafCredentials aafCredentials = DmaapUtils.extractAafCredentials(gson, input.descriptor());
final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java
index ad9b021e..a746fac6 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java
@@ -23,8 +23,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.Kafka;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java
index 4990f80a..4cc28b37 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java
@@ -22,8 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java
index 137964c3..19108286 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java
@@ -22,8 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java
index 00e9b500..1cd3b487 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java
@@ -28,12 +28,11 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -52,7 +51,7 @@ public final class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> {
@Override
public KafkaSink unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SINK);
+ assertStreamType(input, StreamType.KAFKA, DataStreamDirection.SINK);
final JsonObject json = input.descriptor();
final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java
index 9465564b..7bdc12c6 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java
@@ -28,12 +28,11 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -52,7 +51,7 @@ public final class KafkaSourceParser implements StreamFromGsonParser<KafkaSource
@Override
public KafkaSource unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SOURCE);
+ assertStreamType(input, StreamType.KAFKA, DataStreamDirection.SOURCE);
final JsonObject json = input.descriptor();
final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
index 4cfa33ac..50a004c6 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
@@ -27,8 +27,8 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.control.Option;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java
new file mode 100644
index 00000000..c57ce027
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java
@@ -0,0 +1,151 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
+
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class MessageRouterSinksIT {
+
+ final JsonObject json = GsonUtils.readObjectFromResource("/streams/integration_message_router.json");
+
+ MessageRouterSinksIT() throws IOException {
+ }
+
+ @Test
+ void thereShouldBeNoDataSources() {
+ assertThat(DataStreams.namedSources(json)).isEmpty();
+ }
+
+ @Test
+ void thereShouldBeSomeSinksDefined() {
+ assertThat(DataStreams.namedSinks(json)).isNotEmpty();
+ assertThat(DataStreams.namedSinks(json)).hasSize(4);
+ }
+
+ @Test
+ void allSinksShouldBeOfMessageRouterType() {
+ assertThat(DataStreams.namedSinks(json).map(RawDataStream::type).distinct())
+ .containsExactly(StreamType.MESSAGE_ROUTER);
+ }
+
+ @Test
+ void sinksShouldHaveProperDirection() {
+ assertThat(DataStreams.namedSinks(json).map(RawDataStream::direction).distinct())
+ .containsExactly(DataStreamDirection.SINK);
+ }
+
+ @Test
+ void verifySecMeasurementSink() {
+ // given
+ final String streamName = "sec_measurement";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNotNull();
+ assertThat(parsedSink.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+ assertThat(parsedSink.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isEqualTo("111111");
+ assertThat(parsedSink.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-MEASUREMENT-OUTPUT");
+ }
+
+ @Test
+ void verifySecFaultUnsecureSink() {
+ // given
+ final String streamName = "sec_fault_unsecure";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNull();
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isNull();
+ assertThat(parsedSink.clientRole()).describedAs("client role").isNull();
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV");
+ }
+
+ @Test
+ void verifySecMeasurementUnsecureSink() {
+ // given
+ final String streamName = "sec_measurement_unsecure";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNull();
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isNull();
+ assertThat(parsedSink.clientRole()).describedAs("client role").isNull();
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV");
+ }
+
+ @Test
+ void verifySecFaultSink() {
+ // given
+ final String streamName = "sec_fault";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNotNull();
+ assertThat(parsedSink.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+ assertThat(parsedSink.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isEqualTo("222222");
+ assertThat(parsedSink.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-FAULT-OUTPUT");
+ }
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java
new file mode 100644
index 00000000..4508939a
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java
@@ -0,0 +1,204 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class MixedDmaapStreamsIT {
+
+ final JsonObject json = GsonUtils.readObjectFromResource("/streams/integration_mixed_dmaap.json");
+ final List<RawDataStream<JsonObject>> sources = DataStreams.namedSources(json).toList();
+ final List<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json).toList();
+
+ MixedDmaapStreamsIT() throws IOException {
+ }
+
+ @Test
+ void thereShouldBeSomeSinksDefined() {
+ assertThat(sinks).isNotEmpty();
+ assertThat(sinks).hasSize(3);
+ }
+
+ @Test
+ void thereShouldBeSomeSourcesDefined() {
+ assertThat(sources).isNotEmpty();
+ assertThat(sources).hasSize(3);
+ }
+
+ @Test
+ void allStreamsShouldBeOfProperType() {
+ assertThat(sources.map(RawDataStream::type).distinct()).containsExactly(StreamType.DATA_ROUTER, StreamType.MESSAGE_ROUTER);
+ assertThat(sinks.map(RawDataStream::type).distinct()).containsExactly(StreamType.DATA_ROUTER);
+ }
+
+ @Test
+ void sinksShouldHaveProperDirection() {
+ assertThat(sinks.map(RawDataStream::direction).distinct())
+ .containsExactly(DataStreamDirection.SINK);
+ }
+
+ @Test
+ void sourcesShouldHaveProperDirection() {
+ assertThat(sources.map(RawDataStream::direction).distinct())
+ .containsExactly(DataStreamDirection.SOURCE);
+ }
+
+ @Test
+ void verifyDcaeGuestOsSource() {
+ // given
+ final String streamName = "DCAE_GUEST_OS";
+ final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSource parsedSource = StreamFromGsonParsers.dataRouterSourceParser().unsafeParse(source);
+
+ // then
+ assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSource.username()).describedAs("user name").isEqualTo("xyz");
+ assertThat(parsedSource.password()).describedAs("password").isEqualTo("abc");
+ assertThat(parsedSource.deliveryUrl()).describedAs("delivery url")
+ .isEqualTo("https://dr.global:8666/DCAE_SAM_GUEST_OS");
+ assertThat(parsedSource.subscriberId()).describedAs("subscriber id").isEqualTo("811");
+ }
+
+ @Test
+ void verifyDcaeRawDataSource() {
+ // given
+ final String streamName = "DCAE_RAW_DATA";
+ final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSource parsedSource = StreamFromGsonParsers.dataRouterSourceParser().unsafeParse(source);
+
+ // then
+ assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSource.username()).describedAs("user name").isEqualTo("abc");
+ assertThat(parsedSource.password()).describedAs("password").isEqualTo("xyz");
+ assertThat(parsedSource.deliveryUrl()).describedAs("delivery url")
+ .isEqualTo("https://dr.global:8666/DCAE_CEILOMETER_RAW_DATA");
+ assertThat(parsedSource.subscriberId()).describedAs("subscriber id").isEqualTo("812");
+ }
+
+ @Test
+ void verifySecMeasurementOutputSource() {
+ // given
+ final String streamName = "sec-measurement-output";
+ final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
+
+ // then
+ assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSource.aafCredentials()).describedAs("aaf credentials").isNotNull();
+ assertThat(parsedSource.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+ assertThat(parsedSource.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+ assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSource.clientId()).describedAs("client id").isEqualTo("1111");
+ assertThat(parsedSource.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+ assertThat(parsedSource.topicUrl()).describedAs("topic url")
+ .isEqualTo("https://mr.hostname:3905/events/com.att.dcae.dmaap.SEC-MEASUREMENT-OUTPUT-v1");
+ }
+
+ @Test
+ void verifyDcaeVoipPmDataSink() {
+ // given
+ final String streamName = "DCAE_VOIP_PM_DATA";
+ final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSink.username()).describedAs("user name").isEqualTo("abc");
+ assertThat(parsedSink.password()).describedAs("password").isEqualTo("xyz");
+ assertThat(parsedSink.logUrl()).describedAs("log url")
+ .isEqualTo("https://dcae-drps/feedlog/206");
+ assertThat(parsedSink.publishUrl()).describedAs("publish url")
+ .isEqualTo("https://dcae-drps/publish/206");
+ assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("206.518hu");
+ }
+
+ @Test
+ void verifyDcaeGuestOsOSink() {
+ // given
+ final String streamName = "DCAE_GUEST_OS_O";
+ final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSink.username()).describedAs("user name").isEqualTo("axyz");
+ assertThat(parsedSink.password()).describedAs("password").isEqualTo("abc");
+ assertThat(parsedSink.logUrl()).describedAs("log url")
+ .isEqualTo("https://dcae-drps/feedlog/203");
+ assertThat(parsedSink.publishUrl()).describedAs("publish url")
+ .isEqualTo("https://dcae-drps/publish/203");
+ assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("203.2od8s");
+ }
+
+
+ @Test
+ void verifyDcaePmDataSink() {
+ // given
+ final String streamName = "DCAE_PM_DATA";
+ final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23bdce2");
+ assertThat(parsedSink.username()).describedAs("user name").isEqualTo("xyz");
+ assertThat(parsedSink.password()).describedAs("password").isEqualTo("abc");
+ assertThat(parsedSink.logUrl()).describedAs("log url")
+ .isEqualTo("https://dcae-drps/feedlog/493");
+ assertThat(parsedSink.publishUrl()).describedAs("publish url")
+ .isEqualTo("https://dcae-drps/publish/493");
+ assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("493.eacqs");
+ }
+
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
index a51b87aa..a296c920 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
@@ -23,9 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
import static org.assertj.core.api.Assertions.assertThat;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER;
import com.google.gson.JsonObject;
-import io.vavr.collection.Map;
import io.vavr.collection.Stream;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
@@ -42,10 +44,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.Strea
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -182,12 +184,11 @@ class CbsClientImplIT {
// when
final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
.map(json -> {
- final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
- .groupBy(RawDataStream::type);
+ final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json);
- final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
+ final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA))
.map(kafkaSinkParser::unsafeParse);
- final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
+ final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER))
.map(mrSinkParser::unsafeParse);
assertThat(allKafkaSinks.size())
@@ -225,8 +226,8 @@ class CbsClientImplIT {
.expectErrorSatisfies(ex -> {
assertThat(ex).isInstanceOf(StreamParsingException.class);
assertThat(ex).hasMessageContaining("Invalid stream type");
- assertThat(ex).hasMessageContaining("message_router");
- assertThat(ex).hasMessageContaining("kafka");
+ assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString());
+ assertThat(ex).hasMessageContaining(KAFKA.toString());
})
.verify(Duration.ofSeconds(5));
}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java
new file mode 100644
index 00000000..a26af446
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java
@@ -0,0 +1,103 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.GsonAdaptersAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class DmaapUtilsTest {
+
+ @Test
+ void extractAafCredentials_shouldReturnNull_whenAllFieldsAreNull() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":null,\"aaf_password\":null}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnNull_whenAllFieldsAreAbsent() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"whatever\":\"else\"}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnValue_whenBothFieldsAreSet() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\",\"aaf_password\":\"passwd\"}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").password("passwd").build());
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnValueWithUser_whenOnlyUserIsSet() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\"}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").build());
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnValueWithUser_whenPasswordIsNull() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\",\"aaf_password\":null}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").build());
+ }
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
index 7092de5a..90c69942 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
@@ -19,25 +19,22 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSink;
class DataRouterSinkParserTest {
@@ -101,8 +98,8 @@ class DataRouterSinkParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '"
- + MESSAGE_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.DATA_ROUTER + "', but was '"
+ + StreamType.MESSAGE_ROUTER + "'");
}
);
}
@@ -113,7 +110,7 @@ class DataRouterSinkParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.DATA_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SINK)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
index b2d01309..f704e523 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
@@ -19,27 +19,22 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSource;
public class DataRouterSourceParserTest {
@@ -100,8 +95,8 @@ public class DataRouterSourceParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '"
- + MESSAGE_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.DATA_ROUTER + "', but was '"
+ + StreamType.MESSAGE_ROUTER + "'");
}
);
}
@@ -112,7 +107,7 @@ public class DataRouterSourceParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.DATA_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SOURCE)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
index 4d3b88b8..e3182c5c 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
@@ -19,25 +19,21 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -83,8 +79,7 @@ public class MessageRouterSinkParserTest {
// then
assertThat(result).isInstanceOf(MessageRouterSink.class);
assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
- assertThat(result.aafCredentials().username()).isNull();
- assertThat(result.aafCredentials().password()).isNull();
+ assertThat(result.aafCredentials()).isNull();
assertThat(result.clientId()).isNull();
}
@@ -100,8 +95,8 @@ public class MessageRouterSinkParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '"
- + DATA_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.MESSAGE_ROUTER + "', but was '"
+ + StreamType.DATA_ROUTER + "'");
}
);
}
@@ -112,7 +107,7 @@ public class MessageRouterSinkParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.MESSAGE_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SINK)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
index d497817f..51e56764 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
@@ -19,25 +19,21 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -81,8 +77,7 @@ public class MessageRouterSourceParserTest {
// then
assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
- assertThat(result.aafCredentials().username()).isNull();
- assertThat(result.aafCredentials().password()).isNull();
+ assertThat(result.aafCredentials()).isNull();
assertThat(result.clientId()).isNull();
}
@@ -98,8 +93,8 @@ public class MessageRouterSourceParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '"
- + DATA_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.MESSAGE_ROUTER + "', but was '"
+ + StreamType.DATA_ROUTER + "'");
}
);
}
@@ -110,7 +105,7 @@ public class MessageRouterSourceParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.MESSAGE_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SOURCE)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
index 5974639c..2e4f71b3 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
@@ -30,10 +30,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
index d255d99a..1e8e3f52 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
@@ -31,10 +31,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json b/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json
new file mode 100644
index 00000000..d38b0cce
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json
@@ -0,0 +1,62 @@
+{
+ "collector.schema.file": "./etc/CommonEventFormat_27.2.json",
+ "collector.service.port": 8080,
+ "collector.dmaap.streamid": "fault=sec_fault,roadm-sec-to-hp|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert",
+ "collector.schema.checkflag": 1,
+ "tomcat.maxthreads": "200",
+ "collector.keystore.passwordfile": "/opt/app/dcae-certificate/.password",
+ "streams_subscribes": {},
+ "services_calls": {},
+ "collector.inputQueue.maxPending": 8096,
+ "header.authflag": 0,
+ "collector.keystore.file.location": "/opt/app/dcae-certificate/keystore.jks",
+ "collector.service.secure.port": -1,
+ "header.authlist": "userid1,base64encodepwd1|userid2,base64encodepwd2",
+ "collector.keystore.alias": "dynamically generated",
+ "streams_publishes": {
+ "sec_measurement": {
+ "type": "message_router",
+ "aaf_password": "aaf_password",
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": "111111",
+ "client_role": "com.att.dcae.member",
+ "topic_url": "https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-MEASUREMENT-OUTPUT"
+ },
+ "aaf_username": "aaf_username"
+ },
+ "sec_fault_unsecure": {
+ "type": "message_router",
+ "aaf_password": null,
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": null,
+ "client_role": null,
+ "topic_url": "http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+ },
+ "aaf_username": null
+ },
+ "sec_measurement_unsecure": {
+ "type": "message_router",
+ "aaf_password": null,
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": null,
+ "client_role": null,
+ "topic_url": "http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+ },
+ "aaf_username": null
+ },
+ "sec_fault": {
+ "type": "message_router",
+ "aaf_password": "aaf_password",
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": "222222",
+ "client_role": "com.att.dcae.member",
+ "topic_url": "https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-FAULT-OUTPUT"
+ },
+ "aaf_username": "aaf_username"
+ }
+ }
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json b/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json
new file mode 100644
index 00000000..acc7b987
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json
@@ -0,0 +1,80 @@
+{
+
+ "streams_subscribes": {
+ "DCAE_GUEST_OS": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "xyz",
+ "password": "abc",
+ "location": "mtn23",
+ "delivery_url": "https://dr.global:8666/DCAE_SAM_GUEST_OS",
+ "subscriber_id": "811"
+ }
+ },
+ "DCAE_RAW_DATA": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "abc",
+ "password": "xyz",
+ "location": "mtn23",
+ "delivery_url": "https://dr.global:8666/DCAE_CEILOMETER_RAW_DATA",
+ "subscriber_id": "812"
+ }
+ },
+ "sec-measurement-output": {
+ "type": "message_router",
+ "aaf_password": "aaf_password",
+ "dmaap_info": {
+ "topic_url": "https://mr.hostname:3905/events/com.att.dcae.dmaap.SEC-MEASUREMENT-OUTPUT-v1",
+ "client_role": "com.att.dcae.member",
+ "location": "mtn23",
+ "client_id": "1111"
+ },
+ "aaf_username": "aaf_username"
+
+ }
+
+ },
+
+ "streams_publishes": {
+
+ "DCAE_VOIP_PM_DATA": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "abc",
+ "log_url": "https://dcae-drps/feedlog/206",
+ "publish_url": "https://dcae-drps/publish/206",
+ "location": "mtn23",
+ "password": "xyz",
+ "publisher_id": "206.518hu"
+
+ }
+ },
+
+ "DCAE_GUEST_OS_O": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "axyz",
+ "log_url": "https://dcae-drps/feedlog/203",
+ "publish_url": "https://dcae-drps/publish/203",
+ "location": "mtn23",
+ "password": "abc",
+
+ "publisher_id": "203.2od8s"
+ }
+ },
+
+ "DCAE_PM_DATA": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "xyz",
+ "log_url": "https://dcae-drps/feedlog/493",
+ "publish_url": "https://dcae-drps/publish/493",
+ "location": "mtn23bdce2",
+ "password": "abc",
+ "publisher_id": "493.eacqs"
+ }
+ }
+ }
+
+} \ No newline at end of file
diff --git a/rest-services/model/pom.xml b/rest-services/model/pom.xml
new file mode 100644
index 00000000..51f8ffcb
--- /dev/null
+++ b/rest-services/model/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START====================================
+ ~ DCAEGEN2-SERVICES-SDK
+ ~ =========================================================
+ ~ Copyright (C) 2019 Nokia. All rights reserved.
+ ~ =========================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=====================================
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.services.sdk</groupId>
+ <artifactId>dcaegen2-services-sdk-rest-services</artifactId>
+ <version>1.1.4-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>model</artifactId>
+
+ <name>dcaegen2-services-sdk-rest-services-model</name>
+ <description>Rest Services Model</description>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.immutables</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.immutables</groupId>
+ <artifactId>value</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.vavr</groupId>
+ <artifactId>vavr</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/AafCredentials.java
index 9fa83bcb..565efa10 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/AafCredentials.java
@@ -18,14 +18,13 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+package org.onap.dcaegen2.services.sdk.model.streams;
import com.google.gson.annotations.SerializedName;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
* Represents the AAF Credentials. Currently it contains only user name and password.
@@ -33,7 +32,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
@Value.Immutable
@Gson.TypeAdapters
public interface AafCredentials {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStream.java
index 1950a304..06fabccd 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStream.java
@@ -18,10 +18,9 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+package org.onap.dcaegen2.services.sdk.model.streams;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
* Represents a named data stream.
@@ -29,7 +28,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
public interface DataStream {
@Value.Default
default String name() {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStreamDirection.java
index 3d05c9a9..240a4c82 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStreamDirection.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+package org.onap.dcaegen2.services.sdk.model.streams;
/**
* The direction of the stream, ie. whether it's input ({@code SOURCE}) or output ({@code SINK}) stream.
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/RawDataStream.java
index d6bc8000..7f6040ee 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/RawDataStream.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+package org.onap.dcaegen2.services.sdk.model.streams;
import org.immutables.value.Value;
@@ -32,7 +32,7 @@ import org.immutables.value.Value;
@Value.Immutable
public interface RawDataStream<T> {
String name();
- String type();
+ StreamType type();
DataStreamDirection direction();
T descriptor();
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SinkStream.java
index 7002fd68..5d1d5873 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SinkStream.java
@@ -18,9 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
-
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+package org.onap.dcaegen2.services.sdk.model.streams;
/**
* Represents an output stream, ie. one of objects in <em>streams_publishes</em> array from application configuration.
@@ -29,7 +27,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
public interface SinkStream extends DataStream {
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SourceStream.java
index c5ab8a34..9b68c785 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SourceStream.java
@@ -18,9 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
-
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+package org.onap.dcaegen2.services.sdk.model.streams;
/**
* Represents an input stream, ie. one of objects in <em>streams_subscribes</em> array from application configuration.
@@ -29,7 +27,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
public interface SourceStream extends DataStream {
}
diff --git a/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/StreamType.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/StreamType.java
new file mode 100644
index 00000000..2e08c82b
--- /dev/null
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/StreamType.java
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.model.streams;
+
+import io.vavr.collection.Stream;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+public enum StreamType {
+ MESSAGE_ROUTER("message_router"),
+ DATA_ROUTER("data_router"),
+ KAFKA("kafka"),
+ UNKNOWN("unknown");
+
+ private final String rawType;
+
+ StreamType(String rawType) {
+ this.rawType = rawType;
+ }
+
+ public static StreamType parse(@NotNull String rawType) {
+ return Stream.of(StreamType.values())
+ .find(type -> type.rawType.equals(rawType))
+ .getOrElse(UNKNOWN);
+ }
+
+ @Override
+ public String toString() {
+ return rawType;
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouter.java
index 072d4b0b..38adb197 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouter.java
@@ -17,18 +17,16 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
import com.google.gson.annotations.SerializedName;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
public interface DataRouter {
/**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSink.java
index baddb91e..bfe31182 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSink.java
@@ -17,22 +17,20 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
import com.google.gson.annotations.SerializedName;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
@Gson.TypeAdapters
-@ExperimentalApi
@Value.Immutable
public interface DataRouterSink extends DataRouter, SinkStream {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSource.java
index d089a403..4ba81acb 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSource.java
@@ -17,22 +17,20 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
import com.google.gson.annotations.SerializedName;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SourceStream;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
@Gson.TypeAdapters
-@ExperimentalApi
@Value.Immutable
public interface DataRouterSource extends DataRouter, SourceStream {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/Kafka.java
index 42558cbf..df2cee6d 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/Kafka.java
@@ -17,21 +17,19 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
import static io.vavr.Predicates.not;
import io.vavr.collection.List;
import org.immutables.value.Value;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
public interface Kafka {
/**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSink.java
index bd6ab1ca..2c397615 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSink.java
@@ -17,17 +17,15 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
@Value.Immutable
public interface KafkaSink extends Kafka, SinkStream {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSource.java
index 78f5c3af..799d3af5 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSource.java
@@ -17,18 +17,16 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
import org.immutables.value.Value;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SourceStream;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
@Value.Immutable
public interface KafkaSource extends Kafka, SourceStream {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouter.java
index 3cca5134..3a6ba0f6 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouter.java
@@ -17,18 +17,16 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
import com.google.gson.annotations.SerializedName;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
public interface MessageRouter {
/**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSink.java
index 3af79638..1820775b 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSink.java
@@ -17,17 +17,15 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
@Value.Immutable
public interface MessageRouterSink extends MessageRouter, SinkStream {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSource.java
index c7159f26..b92dff1f 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java
+++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSource.java
@@ -17,17 +17,15 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SourceStream;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
@Value.Immutable
public interface MessageRouterSource extends MessageRouter, SourceStream {
diff --git a/rest-services/pom.xml b/rest-services/pom.xml
index f54ea772..aa8caf27 100644
--- a/rest-services/pom.xml
+++ b/rest-services/pom.xml
@@ -18,6 +18,7 @@
<packaging>pom</packaging>
<modules>
+ <module>model</module>
<module>common-dependency</module>
<module>aai-client</module>
<module>cbs-client</module>