aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/cbs-client/src
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/cbs-client/src')
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java3
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java60
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java13
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java)32
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java12
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java12
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java16
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java16
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java13
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java13
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java38
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java48
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java38
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java35
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java35
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java59
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java57
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java51
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java79
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java34
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java40
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java63
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java34
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java34
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java151
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java204
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java23
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java103
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java27
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java29
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java29
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java29
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java7
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java7
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/integration_message_router.json62
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json80
50 files changed, 811 insertions, 835 deletions
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java
index cbdea005..3e295a0f 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java
@@ -21,13 +21,11 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions;
import io.vavr.control.Either;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.2
*/
-@ExperimentalApi
public class StreamParserError {
private final String message;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java
index ca531e82..4fca3d9a 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java
@@ -20,13 +20,10 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.2
*/
-@ExperimentalApi
public class StreamParsingException extends CbsClientException {
private final StreamParserError cause;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
index 648b7a61..e9263f4f 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
@@ -23,10 +23,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.collection.Stream;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
/**
* Extract streams from the application configuration represented as GSON JsonObject.
@@ -64,7 +63,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Raw
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
public final class DataStreams {
private DataStreams() {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
index a8ce3644..2fd1a49d 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
@@ -21,8 +21,7 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
import com.google.gson.JsonObject;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStream;
/**
* Represents parser taking GSON JsonObject as an input
@@ -30,7 +29,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
-@ExperimentalApi
public interface StreamFromGsonParser<S extends DataStream> extends StreamParser<JsonObject, S> {
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
index 7476e976..e117a3c1 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
@@ -26,7 +26,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSourceParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.*;
/**
* Factory methods for GSON-based {@code StreamParser}s
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
index 69016ed8..61afbe4f 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
@@ -22,11 +22,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
import io.vavr.control.Either;
import io.vavr.control.Try;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
/**
* A generic data stream parser which parses {@code T} to data stream {@code S}.
@@ -36,7 +35,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Raw
* @param <S> output data type
* @since 1.1.4
*/
-@ExperimentalApi
public interface StreamParser<T, S extends DataStream> {
/**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java
new file mode 100644
index 00000000..dfc6344a
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import java.util.Objects;
+import java.util.function.Predicate;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+
+/**
+ * A small collection of predicates usable when filtering {@link RawDataStream}s.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+public final class StreamPredicates {
+
+ public StreamPredicates() {
+ }
+
+ /**
+ * Predicate for matching {@link RawDataStream} by name.
+ *
+ * @param name data stream name
+ * @param <T> type of data stream
+ * @return a predicate which returns true only when a stream name is equal to the given name
+ */
+ public static <T> Predicate<RawDataStream<T>> streamWithName(String name) {
+ return stream -> Objects.equals(stream.name(), name);
+ }
+
+ /**
+ * Predicate for matching {@link RawDataStream} by type.
+ *
+ * @param type data stream type
+ * @param <T> type of data stream
+ * @return a predicate which returns true only when a stream type is equal to the given type
+ */
+ public static <T> Predicate<RawDataStream<T>> streamOfType(StreamType type) {
+ return stream -> stream.type() == type;
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
index 1148574e..7f3ccf35 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
@@ -25,9 +25,10 @@ import com.google.gson.JsonObject;
import io.vavr.collection.Stream;
import java.io.IOException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -46,9 +47,9 @@ public final class DataStreamUtils {
public static void assertStreamType(
RawDataStream<JsonObject> json,
- String expectedType,
+ StreamType expectedType,
DataStreamDirection expectedDirection) {
- if (!json.type().equals(expectedType)) {
+ if (json.type() != expectedType) {
throw new StreamParsingException(
"Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'");
}
@@ -71,7 +72,7 @@ public final class DataStreamUtils {
return ImmutableRawDataStream.<JsonObject>builder()
.name(name)
.direction(direction)
- .type(GsonUtils.requiredString(json, "type"))
+ .type(StreamType.parse(GsonUtils.requiredString(json, "type")))
.descriptor(json)
.build();
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
index 0fdec5d8..7776a1ef 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
@@ -35,9 +35,9 @@ import java.util.stream.Collectors;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.GsonAdaptersAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.GsonAdaptersDataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.GsonAdaptersDataRouterSource;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java
index a9dd0c4d..68304cae 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java
@@ -25,12 +25,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
public final class StreamsConstants {
- public static final String DATA_ROUTER_TYPE = "data_router";
-
- public static final String MESSAGE_ROUTER_TYPE = "message_router";
-
- public static final String KAFKA_TYPE = "kafka";
-
public static final String DMAAP_INFO_CHILD_NAME = "dmaap_info";
public static final String KAFKA_INFO_CHILD_NAME = "kafka_info";
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java
index 9fa83bcb..858fd73b 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java
@@ -18,29 +18,27 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap;
-
-import com.google.gson.annotations.SerializedName;
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
/**
- * Represents the AAF Credentials. Currently it contains only user name and password.
- *
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
+ * @since March 2019
*/
-@ExperimentalApi
-@Value.Immutable
-@Gson.TypeAdapters
-public interface AafCredentials {
+public class DmaapUtils {
+
+ public static final ImmutableAafCredentials EMPTY_CREDENTIALS = ImmutableAafCredentials.builder().build();
- @SerializedName(value = "username", alternate = "aaf_username")
- @Nullable String username();
+ private DmaapUtils() {
+ }
- @SerializedName(value = "password", alternate = "aaf_password")
- @Nullable String password();
+ public static @Nullable AafCredentials extractAafCredentials(Gson gson, JsonObject input) {
+ final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+ return EMPTY_CREDENTIALS.equals(aafCredentials) ? null : aafCredentials;
+ }
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java
index 83ca4cba..4cf7cbec 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java
@@ -22,17 +22,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSink;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -51,7 +51,7 @@ public final class DataRouterSinkParser implements StreamFromGsonParser<DataRout
@Override
public DataRouterSink unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SINK);
+ assertStreamType(input, StreamType.DATA_ROUTER, DataStreamDirection.SINK);
final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class).withName(input.name());
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java
index d8b7109d..a8800711 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java
@@ -22,17 +22,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSource;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -51,7 +51,7 @@ public final class DataRouterSourceParser implements StreamFromGsonParser<DataRo
@Override
public DataRouterSource unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SOURCE);
+ assertStreamType(input, StreamType.DATA_ROUTER, DataStreamDirection.SOURCE);
final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class).withName(input.name());
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java
index c5d254f0..40b8f383 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java
@@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouter;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouter;
import java.util.Objects;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java
index 7122d7c5..650161f7 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java
@@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java
index 49871b11..286c4494 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java
@@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java
index b1f037ab..dc2c2e2d 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java
@@ -23,17 +23,17 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
public final class MessageRouterSinkParser implements StreamFromGsonParser<MessageRouterSink> {
@@ -49,9 +49,9 @@ public final class MessageRouterSinkParser implements StreamFromGsonParser<Messa
@Override
public MessageRouterSink unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SINK);
+ assertStreamType(input, StreamType.MESSAGE_ROUTER, DataStreamDirection.SINK);
- final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
+ final AafCredentials aafCredentials = DmaapUtils.extractAafCredentials(gson, input.descriptor());
final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java
index e6b964d0..148584a6 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java
@@ -23,17 +23,17 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -52,9 +52,9 @@ public final class MessageRouterSourceParser implements StreamFromGsonParser<Mes
@Override
public MessageRouterSource unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SOURCE);
+ assertStreamType(input, StreamType.MESSAGE_ROUTER, DataStreamDirection.SOURCE);
- final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
+ final AafCredentials aafCredentials = DmaapUtils.extractAafCredentials(gson, input.descriptor());
final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java
index ad9b021e..a746fac6 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java
@@ -23,8 +23,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.Kafka;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java
index 4990f80a..4cc28b37 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java
@@ -22,8 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java
index 137964c3..19108286 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java
@@ -22,8 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java
index 00e9b500..1cd3b487 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java
@@ -28,12 +28,11 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -52,7 +51,7 @@ public final class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> {
@Override
public KafkaSink unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SINK);
+ assertStreamType(input, StreamType.KAFKA, DataStreamDirection.SINK);
final JsonObject json = input.descriptor();
final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java
index 9465564b..7bdc12c6 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java
@@ -28,12 +28,11 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -52,7 +51,7 @@ public final class KafkaSourceParser implements StreamFromGsonParser<KafkaSource
@Override
public KafkaSource unsafeParse(RawDataStream<JsonObject> input) {
- assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SOURCE);
+ assertStreamType(input, StreamType.KAFKA, DataStreamDirection.SOURCE);
final JsonObject json = input.descriptor();
final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
index 4cfa33ac..50a004c6 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
@@ -27,8 +27,8 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.control.Option;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java
deleted file mode 100644
index 1950a304..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
-
-import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-
-/**
- * Represents a named data stream.
- *
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@ExperimentalApi
-public interface DataStream {
- @Value.Default
- default String name() {
- return "";
- }
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java
deleted file mode 100644
index 3d05c9a9..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
-
-/**
- * The direction of the stream, ie. whether it's input ({@code SOURCE}) or output ({@code SINK}) stream.
- *
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-public enum DataStreamDirection {
-
- SINK("streams_publishes"),
- SOURCE("streams_subscribes");
-
- private final String configurationKey;
-
- DataStreamDirection(String configurationKey) {
- this.configurationKey = configurationKey;
- }
-
- /**
- * The configuration key under which the single stream definitions should reside.
- *
- * @return the configuration key
- */
- public String configurationKey() {
- return configurationKey;
- }
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java
deleted file mode 100644
index d6bc8000..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
-
-import org.immutables.value.Value;
-
-/**
- * Represents a raw/uninterpreted data stream.
- *
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- * @param <T> type of raw data, eg. JsonObject
- */
-@Value.Immutable
-public interface RawDataStream<T> {
- String name();
- String type();
- DataStreamDirection direction();
- T descriptor();
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java
deleted file mode 100644
index 7002fd68..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
-
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-
-/**
- * Represents an output stream, ie. one of objects in <em>streams_publishes</em> array from application configuration.
- * Application can put data to this stream.
- *
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@ExperimentalApi
-public interface SinkStream extends DataStream {
-
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java
deleted file mode 100644
index c5ab8a34..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
-
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-
-/**
- * Represents an input stream, ie. one of objects in <em>streams_subscribes</em> array from application configuration.
- * Application can read data from this stream.
- *
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@ExperimentalApi
-public interface SourceStream extends DataStream {
-
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java
deleted file mode 100644
index 072d4b0b..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
-
-
-import com.google.gson.annotations.SerializedName;
-import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@ExperimentalApi
-public interface DataRouter {
-
- /**
- * DCAE location for the publisher, used to set up routing.
- */
- @SerializedName("location")
- @Nullable String location();
-
- /**
- * Username
- * <ul>
- * <li>Data Router uses to authenticate to the subscriber when delivering files OR</li>
- * <li>the publisher uses to authenticate to Data Router.</li>
- * </ul>
- */
- @SerializedName("username")
- @Nullable String username();
-
- /**
- * Password
- * <ul>
- * <li>Data Router uses to authenticate to the subscriber when delivering files OR</li>
- * <li>the publisher uses to authenticate to Data Router.</li>
- * </ul>
- */
- @SerializedName("password")
- @Nullable String password();
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java
deleted file mode 100644
index baddb91e..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
-
-
-import com.google.gson.annotations.SerializedName;
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
-import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@Gson.TypeAdapters
-@ExperimentalApi
-@Value.Immutable
-public interface DataRouterSink extends DataRouter, SinkStream {
-
- /**
- * URL to which the publisher makes Data Router publish requests.
- */
- @SerializedName("publish_url")
- String publishUrl();
-
- /**
- * Publisher id in Data Router
- */
- @SerializedName("publisher_id")
- @Nullable String publisherId();
-
- /**
- * URL from which log data for the feed can be obtained.
- */
- @SerializedName("log_url")
- @Nullable String logUrl();
-
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java
deleted file mode 100644
index d089a403..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
-
-
-import com.google.gson.annotations.SerializedName;
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
-import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@Gson.TypeAdapters
-@ExperimentalApi
-@Value.Immutable
-public interface DataRouterSource extends DataRouter, SourceStream {
-
- /**
- * URL to which the Data Router should deliver files.
- */
- // TODO: since crucial, we need to verify if it should be non-null
- @SerializedName("delivery_url")
- @Nullable String deliveryUrl();
-
- /**
- * Subscriber id in Data Router.
- */
- @SerializedName("subscriber_id")
- @Nullable String subscriberId();
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
deleted file mode 100644
index 42558cbf..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
-
-import static io.vavr.Predicates.not;
-
-import io.vavr.collection.List;
-import org.immutables.value.Value;
-import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@ExperimentalApi
-public interface Kafka {
-
- /**
- * Kafka bootstrap servers as defined in Kafka client documentation under <em>bootstrap.servers</em> configuration
- * key.
- */
- String bootstrapServers();
-
- /**
- * The name of the topic where application should publish or subscribe for the messages.
- */
- String topicName();
-
- /**
- * The credentials to use when authenticating to Kafka cluster or null when connection should be unauthenticated.
- */
- @Nullable AafCredentials aafCredentials();
-
- /**
- * AAF client role that’s requesting publish or subscribe access to the topic.
- */
- @Nullable String clientRole();
-
- /**
- * Client id for given AAF client.
- */
- @Nullable String clientId();
-
- /**
- * The limit on the size of message published to/subscribed from the topic. Can be used to set Kafka client
- * <em>max.request.size</em> configuration property.
- */
- @Value.Default
- default int maxPayloadSizeBytes() {
- return 1024 * 1024;
- }
-
- /**
- * The {@code bootstrapServers} converted to the list of servers' addresses.
- */
- @Value.Derived
- default List<String> bootstrapServerList() {
- return List.of(bootstrapServers().split(",")).filter(not(String::isEmpty));
- }
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java
deleted file mode 100644
index bd6ab1ca..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
-
-import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@ExperimentalApi
-@Value.Immutable
-public interface KafkaSink extends Kafka, SinkStream {
-
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java
deleted file mode 100644
index 78f5c3af..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
-
-import org.immutables.value.Value;
-import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@ExperimentalApi
-@Value.Immutable
-public interface KafkaSource extends Kafka, SourceStream {
-
- /**
- * A unique string that identifies the consumer group this consumer belongs to as defined in Kafka consumer
- * configuration key <em>group.id</em>.
- */
- @Nullable String consumerGroupId();
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java
deleted file mode 100644
index 3cca5134..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
-
-import com.google.gson.annotations.SerializedName;
-import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@ExperimentalApi
-public interface MessageRouter {
-
- /**
- * URL for accessing the topic to publish or receive events.
- */
- @SerializedName("topic_url")
- String topicUrl();
-
- /**
- * AAF client role that’s requesting publish or subscribe access to the topic.
- */
- @SerializedName("client_role")
- @Nullable String clientRole();
-
- /**
- * Client id for given AAF client.
- */
- @SerializedName("client_id")
- @Nullable String clientId();
-
- /**
- * DCAE location for the publisher or subscriber, used to set up routing.
- */
- @SerializedName("location")
- @Nullable String location();
-
- /**
- * The AAF credentials.
- */
- @SerializedName("aaf_credentials")
- @Nullable AafCredentials aafCredentials();
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java
deleted file mode 100644
index 3af79638..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
-
-import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@ExperimentalApi
-@Value.Immutable
-public interface MessageRouterSink extends MessageRouter, SinkStream {
-
-}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java
deleted file mode 100644
index c7159f26..00000000
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
-
-import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.4
- */
-@ExperimentalApi
-@Value.Immutable
-public interface MessageRouterSource extends MessageRouter, SourceStream {
-
-}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java
new file mode 100644
index 00000000..c57ce027
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java
@@ -0,0 +1,151 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
+
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class MessageRouterSinksIT {
+
+ final JsonObject json = GsonUtils.readObjectFromResource("/streams/integration_message_router.json");
+
+ MessageRouterSinksIT() throws IOException {
+ }
+
+ @Test
+ void thereShouldBeNoDataSources() {
+ assertThat(DataStreams.namedSources(json)).isEmpty();
+ }
+
+ @Test
+ void thereShouldBeSomeSinksDefined() {
+ assertThat(DataStreams.namedSinks(json)).isNotEmpty();
+ assertThat(DataStreams.namedSinks(json)).hasSize(4);
+ }
+
+ @Test
+ void allSinksShouldBeOfMessageRouterType() {
+ assertThat(DataStreams.namedSinks(json).map(RawDataStream::type).distinct())
+ .containsExactly(StreamType.MESSAGE_ROUTER);
+ }
+
+ @Test
+ void sinksShouldHaveProperDirection() {
+ assertThat(DataStreams.namedSinks(json).map(RawDataStream::direction).distinct())
+ .containsExactly(DataStreamDirection.SINK);
+ }
+
+ @Test
+ void verifySecMeasurementSink() {
+ // given
+ final String streamName = "sec_measurement";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNotNull();
+ assertThat(parsedSink.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+ assertThat(parsedSink.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isEqualTo("111111");
+ assertThat(parsedSink.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-MEASUREMENT-OUTPUT");
+ }
+
+ @Test
+ void verifySecFaultUnsecureSink() {
+ // given
+ final String streamName = "sec_fault_unsecure";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNull();
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isNull();
+ assertThat(parsedSink.clientRole()).describedAs("client role").isNull();
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV");
+ }
+
+ @Test
+ void verifySecMeasurementUnsecureSink() {
+ // given
+ final String streamName = "sec_measurement_unsecure";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNull();
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isNull();
+ assertThat(parsedSink.clientRole()).describedAs("client role").isNull();
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV");
+ }
+
+ @Test
+ void verifySecFaultSink() {
+ // given
+ final String streamName = "sec_fault";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNotNull();
+ assertThat(parsedSink.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+ assertThat(parsedSink.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isEqualTo("222222");
+ assertThat(parsedSink.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-FAULT-OUTPUT");
+ }
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java
new file mode 100644
index 00000000..4508939a
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java
@@ -0,0 +1,204 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class MixedDmaapStreamsIT {
+
+ final JsonObject json = GsonUtils.readObjectFromResource("/streams/integration_mixed_dmaap.json");
+ final List<RawDataStream<JsonObject>> sources = DataStreams.namedSources(json).toList();
+ final List<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json).toList();
+
+ MixedDmaapStreamsIT() throws IOException {
+ }
+
+ @Test
+ void thereShouldBeSomeSinksDefined() {
+ assertThat(sinks).isNotEmpty();
+ assertThat(sinks).hasSize(3);
+ }
+
+ @Test
+ void thereShouldBeSomeSourcesDefined() {
+ assertThat(sources).isNotEmpty();
+ assertThat(sources).hasSize(3);
+ }
+
+ @Test
+ void allStreamsShouldBeOfProperType() {
+ assertThat(sources.map(RawDataStream::type).distinct()).containsExactly(StreamType.DATA_ROUTER, StreamType.MESSAGE_ROUTER);
+ assertThat(sinks.map(RawDataStream::type).distinct()).containsExactly(StreamType.DATA_ROUTER);
+ }
+
+ @Test
+ void sinksShouldHaveProperDirection() {
+ assertThat(sinks.map(RawDataStream::direction).distinct())
+ .containsExactly(DataStreamDirection.SINK);
+ }
+
+ @Test
+ void sourcesShouldHaveProperDirection() {
+ assertThat(sources.map(RawDataStream::direction).distinct())
+ .containsExactly(DataStreamDirection.SOURCE);
+ }
+
+ @Test
+ void verifyDcaeGuestOsSource() {
+ // given
+ final String streamName = "DCAE_GUEST_OS";
+ final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSource parsedSource = StreamFromGsonParsers.dataRouterSourceParser().unsafeParse(source);
+
+ // then
+ assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSource.username()).describedAs("user name").isEqualTo("xyz");
+ assertThat(parsedSource.password()).describedAs("password").isEqualTo("abc");
+ assertThat(parsedSource.deliveryUrl()).describedAs("delivery url")
+ .isEqualTo("https://dr.global:8666/DCAE_SAM_GUEST_OS");
+ assertThat(parsedSource.subscriberId()).describedAs("subscriber id").isEqualTo("811");
+ }
+
+ @Test
+ void verifyDcaeRawDataSource() {
+ // given
+ final String streamName = "DCAE_RAW_DATA";
+ final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSource parsedSource = StreamFromGsonParsers.dataRouterSourceParser().unsafeParse(source);
+
+ // then
+ assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSource.username()).describedAs("user name").isEqualTo("abc");
+ assertThat(parsedSource.password()).describedAs("password").isEqualTo("xyz");
+ assertThat(parsedSource.deliveryUrl()).describedAs("delivery url")
+ .isEqualTo("https://dr.global:8666/DCAE_CEILOMETER_RAW_DATA");
+ assertThat(parsedSource.subscriberId()).describedAs("subscriber id").isEqualTo("812");
+ }
+
+ @Test
+ void verifySecMeasurementOutputSource() {
+ // given
+ final String streamName = "sec-measurement-output";
+ final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
+
+ // then
+ assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSource.aafCredentials()).describedAs("aaf credentials").isNotNull();
+ assertThat(parsedSource.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+ assertThat(parsedSource.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+ assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSource.clientId()).describedAs("client id").isEqualTo("1111");
+ assertThat(parsedSource.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+ assertThat(parsedSource.topicUrl()).describedAs("topic url")
+ .isEqualTo("https://mr.hostname:3905/events/com.att.dcae.dmaap.SEC-MEASUREMENT-OUTPUT-v1");
+ }
+
+ @Test
+ void verifyDcaeVoipPmDataSink() {
+ // given
+ final String streamName = "DCAE_VOIP_PM_DATA";
+ final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSink.username()).describedAs("user name").isEqualTo("abc");
+ assertThat(parsedSink.password()).describedAs("password").isEqualTo("xyz");
+ assertThat(parsedSink.logUrl()).describedAs("log url")
+ .isEqualTo("https://dcae-drps/feedlog/206");
+ assertThat(parsedSink.publishUrl()).describedAs("publish url")
+ .isEqualTo("https://dcae-drps/publish/206");
+ assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("206.518hu");
+ }
+
+ @Test
+ void verifyDcaeGuestOsOSink() {
+ // given
+ final String streamName = "DCAE_GUEST_OS_O";
+ final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSink.username()).describedAs("user name").isEqualTo("axyz");
+ assertThat(parsedSink.password()).describedAs("password").isEqualTo("abc");
+ assertThat(parsedSink.logUrl()).describedAs("log url")
+ .isEqualTo("https://dcae-drps/feedlog/203");
+ assertThat(parsedSink.publishUrl()).describedAs("publish url")
+ .isEqualTo("https://dcae-drps/publish/203");
+ assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("203.2od8s");
+ }
+
+
+ @Test
+ void verifyDcaePmDataSink() {
+ // given
+ final String streamName = "DCAE_PM_DATA";
+ final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23bdce2");
+ assertThat(parsedSink.username()).describedAs("user name").isEqualTo("xyz");
+ assertThat(parsedSink.password()).describedAs("password").isEqualTo("abc");
+ assertThat(parsedSink.logUrl()).describedAs("log url")
+ .isEqualTo("https://dcae-drps/feedlog/493");
+ assertThat(parsedSink.publishUrl()).describedAs("publish url")
+ .isEqualTo("https://dcae-drps/publish/493");
+ assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("493.eacqs");
+ }
+
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
index a51b87aa..a296c920 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
@@ -23,9 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
import static org.assertj.core.api.Assertions.assertThat;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER;
import com.google.gson.JsonObject;
-import io.vavr.collection.Map;
import io.vavr.collection.Stream;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
@@ -42,10 +44,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.Strea
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -182,12 +184,11 @@ class CbsClientImplIT {
// when
final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
.map(json -> {
- final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
- .groupBy(RawDataStream::type);
+ final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json);
- final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
+ final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA))
.map(kafkaSinkParser::unsafeParse);
- final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
+ final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER))
.map(mrSinkParser::unsafeParse);
assertThat(allKafkaSinks.size())
@@ -225,8 +226,8 @@ class CbsClientImplIT {
.expectErrorSatisfies(ex -> {
assertThat(ex).isInstanceOf(StreamParsingException.class);
assertThat(ex).hasMessageContaining("Invalid stream type");
- assertThat(ex).hasMessageContaining("message_router");
- assertThat(ex).hasMessageContaining("kafka");
+ assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString());
+ assertThat(ex).hasMessageContaining(KAFKA.toString());
})
.verify(Duration.ofSeconds(5));
}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java
new file mode 100644
index 00000000..a26af446
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java
@@ -0,0 +1,103 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.GsonAdaptersAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class DmaapUtilsTest {
+
+ @Test
+ void extractAafCredentials_shouldReturnNull_whenAllFieldsAreNull() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":null,\"aaf_password\":null}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnNull_whenAllFieldsAreAbsent() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"whatever\":\"else\"}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnValue_whenBothFieldsAreSet() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\",\"aaf_password\":\"passwd\"}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").password("passwd").build());
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnValueWithUser_whenOnlyUserIsSet() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\"}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").build());
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnValueWithUser_whenPasswordIsNull() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\",\"aaf_password\":null}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").build());
+ }
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
index 7092de5a..90c69942 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
@@ -19,25 +19,22 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSink;
class DataRouterSinkParserTest {
@@ -101,8 +98,8 @@ class DataRouterSinkParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '"
- + MESSAGE_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.DATA_ROUTER + "', but was '"
+ + StreamType.MESSAGE_ROUTER + "'");
}
);
}
@@ -113,7 +110,7 @@ class DataRouterSinkParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.DATA_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SINK)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
index b2d01309..f704e523 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
@@ -19,27 +19,22 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSource;
public class DataRouterSourceParserTest {
@@ -100,8 +95,8 @@ public class DataRouterSourceParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '"
- + MESSAGE_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.DATA_ROUTER + "', but was '"
+ + StreamType.MESSAGE_ROUTER + "'");
}
);
}
@@ -112,7 +107,7 @@ public class DataRouterSourceParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.DATA_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SOURCE)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
index 4d3b88b8..e3182c5c 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
@@ -19,25 +19,21 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -83,8 +79,7 @@ public class MessageRouterSinkParserTest {
// then
assertThat(result).isInstanceOf(MessageRouterSink.class);
assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
- assertThat(result.aafCredentials().username()).isNull();
- assertThat(result.aafCredentials().password()).isNull();
+ assertThat(result.aafCredentials()).isNull();
assertThat(result.clientId()).isNull();
}
@@ -100,8 +95,8 @@ public class MessageRouterSinkParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '"
- + DATA_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.MESSAGE_ROUTER + "', but was '"
+ + StreamType.DATA_ROUTER + "'");
}
);
}
@@ -112,7 +107,7 @@ public class MessageRouterSinkParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.MESSAGE_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SINK)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
index d497817f..51e56764 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
@@ -19,25 +19,21 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -81,8 +77,7 @@ public class MessageRouterSourceParserTest {
// then
assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
- assertThat(result.aafCredentials().username()).isNull();
- assertThat(result.aafCredentials().password()).isNull();
+ assertThat(result.aafCredentials()).isNull();
assertThat(result.clientId()).isNull();
}
@@ -98,8 +93,8 @@ public class MessageRouterSourceParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '"
- + DATA_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.MESSAGE_ROUTER + "', but was '"
+ + StreamType.DATA_ROUTER + "'");
}
);
}
@@ -110,7 +105,7 @@ public class MessageRouterSourceParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.MESSAGE_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SOURCE)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
index 5974639c..2e4f71b3 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
@@ -30,10 +30,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
index d255d99a..1e8e3f52 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
@@ -31,10 +31,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json b/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json
new file mode 100644
index 00000000..d38b0cce
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json
@@ -0,0 +1,62 @@
+{
+ "collector.schema.file": "./etc/CommonEventFormat_27.2.json",
+ "collector.service.port": 8080,
+ "collector.dmaap.streamid": "fault=sec_fault,roadm-sec-to-hp|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert",
+ "collector.schema.checkflag": 1,
+ "tomcat.maxthreads": "200",
+ "collector.keystore.passwordfile": "/opt/app/dcae-certificate/.password",
+ "streams_subscribes": {},
+ "services_calls": {},
+ "collector.inputQueue.maxPending": 8096,
+ "header.authflag": 0,
+ "collector.keystore.file.location": "/opt/app/dcae-certificate/keystore.jks",
+ "collector.service.secure.port": -1,
+ "header.authlist": "userid1,base64encodepwd1|userid2,base64encodepwd2",
+ "collector.keystore.alias": "dynamically generated",
+ "streams_publishes": {
+ "sec_measurement": {
+ "type": "message_router",
+ "aaf_password": "aaf_password",
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": "111111",
+ "client_role": "com.att.dcae.member",
+ "topic_url": "https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-MEASUREMENT-OUTPUT"
+ },
+ "aaf_username": "aaf_username"
+ },
+ "sec_fault_unsecure": {
+ "type": "message_router",
+ "aaf_password": null,
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": null,
+ "client_role": null,
+ "topic_url": "http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+ },
+ "aaf_username": null
+ },
+ "sec_measurement_unsecure": {
+ "type": "message_router",
+ "aaf_password": null,
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": null,
+ "client_role": null,
+ "topic_url": "http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+ },
+ "aaf_username": null
+ },
+ "sec_fault": {
+ "type": "message_router",
+ "aaf_password": "aaf_password",
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": "222222",
+ "client_role": "com.att.dcae.member",
+ "topic_url": "https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-FAULT-OUTPUT"
+ },
+ "aaf_username": "aaf_username"
+ }
+ }
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json b/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json
new file mode 100644
index 00000000..acc7b987
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json
@@ -0,0 +1,80 @@
+{
+
+ "streams_subscribes": {
+ "DCAE_GUEST_OS": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "xyz",
+ "password": "abc",
+ "location": "mtn23",
+ "delivery_url": "https://dr.global:8666/DCAE_SAM_GUEST_OS",
+ "subscriber_id": "811"
+ }
+ },
+ "DCAE_RAW_DATA": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "abc",
+ "password": "xyz",
+ "location": "mtn23",
+ "delivery_url": "https://dr.global:8666/DCAE_CEILOMETER_RAW_DATA",
+ "subscriber_id": "812"
+ }
+ },
+ "sec-measurement-output": {
+ "type": "message_router",
+ "aaf_password": "aaf_password",
+ "dmaap_info": {
+ "topic_url": "https://mr.hostname:3905/events/com.att.dcae.dmaap.SEC-MEASUREMENT-OUTPUT-v1",
+ "client_role": "com.att.dcae.member",
+ "location": "mtn23",
+ "client_id": "1111"
+ },
+ "aaf_username": "aaf_username"
+
+ }
+
+ },
+
+ "streams_publishes": {
+
+ "DCAE_VOIP_PM_DATA": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "abc",
+ "log_url": "https://dcae-drps/feedlog/206",
+ "publish_url": "https://dcae-drps/publish/206",
+ "location": "mtn23",
+ "password": "xyz",
+ "publisher_id": "206.518hu"
+
+ }
+ },
+
+ "DCAE_GUEST_OS_O": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "axyz",
+ "log_url": "https://dcae-drps/feedlog/203",
+ "publish_url": "https://dcae-drps/publish/203",
+ "location": "mtn23",
+ "password": "abc",
+
+ "publisher_id": "203.2od8s"
+ }
+ },
+
+ "DCAE_PM_DATA": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "xyz",
+ "log_url": "https://dcae-drps/feedlog/493",
+ "publish_url": "https://dcae-drps/publish/493",
+ "location": "mtn23bdce2",
+ "password": "abc",
+ "publisher_id": "493.eacqs"
+ }
+ }
+ }
+
+} \ No newline at end of file