aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java')
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java23
1 files changed, 12 insertions, 11 deletions
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
index a51b87aa..a296c920 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
@@ -23,9 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
import static org.assertj.core.api.Assertions.assertThat;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER;
import com.google.gson.JsonObject;
-import io.vavr.collection.Map;
import io.vavr.collection.Stream;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
@@ -42,10 +44,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.Strea
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -182,12 +184,11 @@ class CbsClientImplIT {
// when
final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
.map(json -> {
- final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
- .groupBy(RawDataStream::type);
+ final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json);
- final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
+ final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA))
.map(kafkaSinkParser::unsafeParse);
- final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
+ final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER))
.map(mrSinkParser::unsafeParse);
assertThat(allKafkaSinks.size())
@@ -225,8 +226,8 @@ class CbsClientImplIT {
.expectErrorSatisfies(ex -> {
assertThat(ex).isInstanceOf(StreamParsingException.class);
assertThat(ex).hasMessageContaining("Invalid stream type");
- assertThat(ex).hasMessageContaining("message_router");
- assertThat(ex).hasMessageContaining("kafka");
+ assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString());
+ assertThat(ex).hasMessageContaining(KAFKA.toString());
})
.verify(Duration.ofSeconds(5));
}