diff options
Diffstat (limited to 'rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java')
-rw-r--r-- | rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java | 23 |
1 files changed, 12 insertions, 11 deletions
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java index a51b87aa..a296c920 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java @@ -23,9 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import static org.assertj.core.api.Assertions.assertThat; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType; +import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA; +import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER; import com.google.gson.JsonObject; -import io.vavr.collection.Map; import io.vavr.collection.Stream; import java.time.Duration; import org.junit.jupiter.api.AfterAll; @@ -42,10 +44,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.Strea import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -182,12 +184,11 @@ class CbsClientImplIT { // when final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request)) .map(json -> { - final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json) - .groupBy(RawDataStream::type); + final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json); - final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty()) + final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA)) .map(kafkaSinkParser::unsafeParse); - final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty()) + final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER)) .map(mrSinkParser::unsafeParse); assertThat(allKafkaSinks.size()) @@ -225,8 +226,8 @@ class CbsClientImplIT { .expectErrorSatisfies(ex -> { assertThat(ex).isInstanceOf(StreamParsingException.class); assertThat(ex).hasMessageContaining("Invalid stream type"); - assertThat(ex).hasMessageContaining("message_router"); - assertThat(ex).hasMessageContaining("kafka"); + assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString()); + assertThat(ex).hasMessageContaining(KAFKA.toString()); }) .verify(Duration.ofSeconds(5)); } |