aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java')
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java98
1 files changed, 96 insertions, 2 deletions
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
index e862d849..e2833fe5 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
@@ -20,20 +20,29 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendResource;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendString;
import com.google.gson.JsonObject;
+import io.vavr.collection.Map;
import io.vavr.collection.Stream;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -128,6 +137,91 @@ class CbsClientImplIT {
.verify(Duration.ofSeconds(5));
}
+ @Test
+ void testCbsClientWithStreamsParsing() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
+ final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+
+ // when
+ final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ .map(json ->
+ DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
+ );
+
+ // then
+ StepVerifier.create(result)
+ .consumeNextWith(kafkaSink -> {
+ assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
+ assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
+ assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
+ })
+ .expectComplete()
+ .verify(Duration.ofSeconds(5));
+ }
+
+ @Test
+ void testCbsClientWithStreamsParsingUsingSwitch() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+ // TODO: Use these parsers below
+ final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
+ final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
+
+ // when
+ final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ .map(json -> {
+ final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
+ .groupBy(RawDataStream::type);
+
+ final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
+ .map(kafkaSinkParser::unsafeParse);
+ final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
+ .map(mrSinkParser::unsafeParse);
+
+ assertThat(allKafkaSinks.size())
+ .describedAs("Number of kafka sinks")
+ .isEqualTo(2);
+ assertThat(allMrSinks.size())
+ .describedAs("Number of DMAAP-MR sinks")
+ .isEqualTo(1);
+
+ return true;
+ })
+ .then();
+
+ // then
+ StepVerifier.create(result)
+ .expectComplete()
+ .verify(Duration.ofSeconds(5));
+ }
+
+ @Test
+ void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
+ final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+
+ // when
+ final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ .map(json ->
+ DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
+ );
+
+ // then
+ StepVerifier.create(result)
+ .expectErrorSatisfies(ex -> {
+ assertThat(ex).isInstanceOf(IllegalArgumentException.class);
+ assertThat(ex).hasMessageContaining("Invalid stream type");
+ assertThat(ex).hasMessageContaining("message_router");
+ assertThat(ex).hasMessageContaining("kafka");
+ })
+ .verify(Duration.ofSeconds(5));
+ }
+
private String sampleConfigValue(JsonObject obj) {
return obj.get(SAMPLE_CONFIG_KEY).getAsString();
}