diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-03-14 14:34:35 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-03-19 15:06:15 +0100 |
commit | 565ec734f46a15bb3c87fe02a32613fc86c0eb22 (patch) | |
tree | 71412d433e9e952db2e2f9db6044dc1a9643e9c0 /rest-services/cbs-client/src/test | |
parent | 9b03823dc6ac4bec2e61a4e54d114a518dbb1100 (diff) |
Kafka streams parsers - additions
Change-Id: I98ca661682b41d76d3de668d6faeb6ebe02f92a8
Issue-ID: DCAEGEN2-1341
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'rest-services/cbs-client/src/test')
16 files changed, 417 insertions, 138 deletions
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java index c9ceeaf1..8a5edcc9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java @@ -20,17 +20,15 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; +import static org.assertj.core.api.Assertions.assertThat; + import com.google.gson.JsonArray; import com.google.gson.JsonObject; import io.vavr.collection.List; +import java.math.BigInteger; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; -import java.math.BigInteger; - - -import static org.assertj.core.api.Assertions.assertThat; - class MerkleTreeParserTest { private final MerkleTreeParser cut = new MerkleTreeParser(); diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java index e862d849..e2833fe5 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java @@ -20,20 +20,29 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendResource; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendString; import com.google.gson.JsonObject; +import io.vavr.collection.Map; import io.vavr.collection.Stream; import java.time.Duration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -128,6 +137,91 @@ class CbsClientImplIT { .verify(Duration.ofSeconds(5)); } + @Test + void testCbsClientWithStreamsParsing() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + + // when + final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + .map(json -> + DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .consumeNextWith(kafkaSink -> { + assertThat(kafkaSink.name()).isEqualTo("perf3gpp"); + assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060"); + assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP"); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingUsingSwitch() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + // TODO: Use these parsers below + final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser(); + + // when + final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + .map(json -> { + final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json) + .groupBy(RawDataStream::type); + + final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty()) + .map(kafkaSinkParser::unsafeParse); + final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty()) + .map(mrSinkParser::unsafeParse); + + assertThat(allKafkaSinks.size()) + .describedAs("Number of kafka sinks") + .isEqualTo(2); + assertThat(allMrSinks.size()) + .describedAs("Number of DMAAP-MR sinks") + .isEqualTo(1); + + return true; + }) + .then(); + + // then + StepVerifier.create(result) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingWhenUsingInvalidParser() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser(); + final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + + // when + final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + .map(json -> + DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .expectErrorSatisfies(ex -> { + assertThat(ex).isInstanceOf(IllegalArgumentException.class); + assertThat(ex).hasMessageContaining("Invalid stream type"); + assertThat(ex).hasMessageContaining("message_router"); + assertThat(ex).hasMessageContaining("kafka"); + }) + .verify(Duration.ofSeconds(5)); + } + private String sampleConfigValue(JsonObject obj) { return obj.get(SAMPLE_CONFIG_KEY).getAsString(); } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java index 9fd7cc88..617904f9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java @@ -30,8 +30,8 @@ import static org.mockito.Mockito.verify; import com.google.gson.JsonObject; import java.net.InetSocketAddress; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import reactor.core.publisher.Mono; /** diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java index 6843e0e3..94ff53f9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java @@ -31,9 +31,9 @@ import com.google.gson.JsonParser; import java.io.InputStreamReader; import java.net.InetSocketAddress; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java index d0485f57..7835a5f9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java @@ -21,9 +21,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import io.vavr.CheckedFunction0; -import io.vavr.Function0; -import java.io.IOException; -import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java deleted file mode 100644 index 87131285..00000000 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.google.gson.JsonObject; -import java.io.IOException; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; - -/** - * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @since March 2019 - */ -class KafkaSourceParserTest { - - private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser(); - - @Test - void precondition_assureInstanceOf() { - assertThat(cut).isInstanceOf(KafkaSourceParser.class); - } - - @Test - void shouldParseMinimalKafkaSourceDefinition() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_source_minimal.json"); - - // when - final KafkaSource result = cut.unsafeParse(input); - - // then - assertThat(result.aafCredentials()).isNull(); - assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); - assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); - assertThat(result.clientId()).isNull(); - assertThat(result.clientRole()).isNull(); - } -}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java index 398ebcd9..7092de5a 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; import com.google.gson.Gson; import com.google.gson.JsonObject; @@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink; @@ -37,6 +41,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea class DataRouterSinkParserTest { + private static final String SAMPLE_LOCATION = "mtc00"; private static final String SAMPLE_PUBLISH_URL = "https://we-are-data-router.us/feed/xyz"; private static final String SAMPLE_LOG_URL = "https://we-are-data-router.us/feed/xyz/logs"; @@ -44,49 +49,54 @@ class DataRouterSinkParserTest { private static final String SAMPLE_PASSWORD = "some-password"; private static final String SAMPLE_PUBLISHER_ID = "123456"; - private static final Gson gson = new Gson(); - private final StreamFromGsonParser<DataRouterSink> streamParser = StreamFromGsonParsers.dataRouterSinkParser(); - private static final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder() - .location(SAMPLE_LOCATION) - .publishUrl(SAMPLE_PUBLISH_URL) - .logUrl(SAMPLE_LOG_URL) - .username(SAMPLE_USER) - .password(SAMPLE_PASSWORD) - .publisherId(SAMPLE_PUBLISHER_ID) - .build(); - - private static final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder() - .publishUrl(SAMPLE_PUBLISH_URL) - .build(); - @Test void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json"); + // when DataRouterSink result = streamParser.unsafeParse(input); + // then + final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder() + .name(input.name()) + .location(SAMPLE_LOCATION) + .publishUrl(SAMPLE_PUBLISH_URL) + .logUrl(SAMPLE_LOG_URL) + .username(SAMPLE_USER) + .password(SAMPLE_PASSWORD) + .publisherId(SAMPLE_PUBLISHER_ID) + .build(); assertThat(result).isEqualTo(fullConfigurationStream); } @Test void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { //given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_minimal.json"); + RawDataStream<JsonObject> input = DataStreamUtils + .readSinkFromResource("/streams/data_router_sink_minimal.json"); + // when DataRouterSink result = streamParser.unsafeParse(input); + // then + final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder() + .name(input.name()) + .publishUrl(SAMPLE_PUBLISH_URL) + .build(); assertThat(result).isEqualTo(minimalConfigurationStream); } @Test void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json"); + // when Either<StreamParserError, DataRouterSink> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); result.peekLeft(error -> { @@ -100,9 +110,17 @@ class DataRouterSinkParserTest { @Test void emptyConfiguration_shouldParseToStreamParserError() { // given - JsonObject input = gson.fromJson("{}", JsonObject.class); + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SINK) + .build(); + // when Either<StreamParserError, DataRouterSink> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java index 681fa147..b2d01309 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -28,7 +28,11 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*; import java.io.IOException; @@ -38,54 +42,60 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; public class DataRouterSourceParserTest { + private static final String SAMPLE_LOCATION = "mtc00"; private static final String SAMPLE_DELIVERY_URL = "https://my-subscriber-app.dcae:8080/target-path"; private static final String SAMPLE_USER = "some-user"; private static final String SAMPLE_PASSWORD = "some-password"; private static final String SAMPLE_SUBSCRIBER_ID = "789012"; - private static final Gson gson = new Gson(); - - private static final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder() - .location(SAMPLE_LOCATION) - .deliveryUrl(SAMPLE_DELIVERY_URL) - .username(SAMPLE_USER) - .password(SAMPLE_PASSWORD) - .subscriberId(SAMPLE_SUBSCRIBER_ID) - .build(); - - private static final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder() - .build(); - - private final StreamFromGsonParser<DataRouterSource> streamParser = StreamFromGsonParsers.dataRouterSourceParser(); @Test void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_source_full.json"); + // when DataRouterSource result = streamParser.unsafeParse(input); + // then + + final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder() + .name(input.name()) + .location(SAMPLE_LOCATION) + .deliveryUrl(SAMPLE_DELIVERY_URL) + .username(SAMPLE_USER) + .password(SAMPLE_PASSWORD) + .subscriberId(SAMPLE_SUBSCRIBER_ID) + .build(); assertThat(result).isEqualTo(fullConfigurationStream); } @Test void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_minimal.json"); + RawDataStream<JsonObject> input = DataStreamUtils + .readSourceFromResource("/streams/data_router_source_minimal.json"); + // when DataRouterSource result = streamParser.unsafeParse(input); + // then + final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder() + .name(input.name()) + .build(); assertThat(result).isEqualTo(minimalConfigurationStream); } @Test void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json"); + // when Either<StreamParserError, DataRouterSource> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); result.peekLeft(error -> { @@ -99,9 +109,17 @@ public class DataRouterSourceParserTest { @Test void emptyConfiguration_shouldBeParsedToStreamParserError() { // given - JsonObject input = gson.fromJson("{}", JsonObject.class); + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SOURCE) + .build(); + // when Either<StreamParserError, DataRouterSource> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java index 63b04d1d..4d3b88b8 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import com.google.gson.Gson; import com.google.gson.JsonObject; @@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; @@ -48,16 +52,16 @@ public class MessageRouterSinkParserTest { private static final String SAMPLE_CLIENT_ID = "1500462518108"; private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic"; - private static final Gson gson = new Gson(); - private final StreamFromGsonParser<MessageRouterSink> streamParser = StreamFromGsonParsers.messageRouterSinkParser(); @Test void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json"); + // when MessageRouterSink result = streamParser.unsafeParse(input); + // then assertThat(result).isInstanceOf(MessageRouterSink.class); assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME); @@ -71,10 +75,11 @@ public class MessageRouterSinkParserTest { @Test void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_minimal.json"); // when MessageRouterSink result = streamParser.unsafeParse(input); + // then assertThat(result).isInstanceOf(MessageRouterSink.class); assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); @@ -86,9 +91,11 @@ public class MessageRouterSinkParserTest { @Test void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json"); + // when Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); result.peekLeft(error -> { @@ -102,9 +109,17 @@ public class MessageRouterSinkParserTest { @Test void emptyConfiguration_shouldParseToStreamParserError() { // given - JsonObject input = gson.fromJson("{}", JsonObject.class); + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SINK) + .build(); + // when Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java index fea63d66..d497817f 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import com.google.gson.Gson; import com.google.gson.JsonObject; @@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource; @@ -48,16 +52,16 @@ public class MessageRouterSourceParserTest { private static final String SAMPLE_CLIENT_ID = "1500462518108"; private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic"; - private static final Gson gson = new Gson(); - private final StreamFromGsonParser<MessageRouterSource> streamParser = StreamFromGsonParsers.messageRouterSourceParser(); @Test void fullConfiguration_shouldGenerateDataRouterSourceObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json"); + // when MessageRouterSource result = streamParser.unsafeParse(input); + // then assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME); assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD); @@ -70,10 +74,11 @@ public class MessageRouterSourceParserTest { @Test void minimalConfiguration_shouldGenerateDataRouterSourceObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_minimal.json"); // when MessageRouterSource result = streamParser.unsafeParse(input); + // then assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); assertThat(result.aafCredentials().username()).isNull(); @@ -84,9 +89,11 @@ public class MessageRouterSourceParserTest { @Test void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_sink_full.json"); + // when Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); result.peekLeft(error -> { @@ -100,7 +107,13 @@ public class MessageRouterSourceParserTest { @Test void emptyConfiguration_shouldParseToStreamParserError() { // given - JsonObject input = gson.fromJson("{}", JsonObject.class); + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SOURCE) + .build(); // when Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input); // then diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java index b5481203..5974639c 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java @@ -18,22 +18,21 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.in; -import static org.junit.jupiter.api.Assertions.*; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import io.vavr.Function1; import io.vavr.control.Either; import java.io.IOException; -import java.io.InputStreamReader; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; /** @@ -52,7 +51,7 @@ class KafkaSinkParserTest { @Test void shouldParseMinimalKafkaSinkDefinition() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_minimal.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_minimal.json"); // when final KafkaSink result = cut.unsafeParse(input); @@ -66,15 +65,19 @@ class KafkaSinkParserTest { } @Test - void shouldParseBasicKafkaSinkDefinition() throws IOException { + void shouldParseFullKafkaSinkDefinition() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink.json"); // when final KafkaSink result = cut.unsafeParse(input); // then - assertThat(result.aafCredentials()).isNull(); + final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder() + .username("the user") + .password("the passwd") + .build(); + assertThat(result.aafCredentials()).isEqualTo(expectedCredentials); assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); assertThat(result.clientId()).isEqualTo("1500462518108"); @@ -84,7 +87,7 @@ class KafkaSinkParserTest { @Test void shouldReturnErrorWhenStructureIsWrong() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_missing_child.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_missing_child.json"); // when final Either<StreamParserError, KafkaSink> result = cut.parse(input); @@ -99,7 +102,7 @@ class KafkaSinkParserTest { @Test void shouldReturnErrorWhenTypeIsWrong() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_invalid_type.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_invalid_type.json"); // when final Either<StreamParserError, KafkaSink> result = cut.parse(input); @@ -112,4 +115,19 @@ class KafkaSinkParserTest { assertThat(error.message()).containsIgnoringCase("message_router"); }); } + + @Test + void shouldReturnErrorWhenDirectionIsWrong() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_sink.json"); + + // when + final Either<StreamParserError, KafkaSink> result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream direction"); + }); + } }
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java new file mode 100644 index 00000000..d255d99a --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java @@ -0,0 +1,120 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.JsonObject; +import io.vavr.collection.List; +import io.vavr.control.Either; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +class KafkaSourceParserTest { + + private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser(); + + @Test + void precondition_assureInstanceOf() { + assertThat(cut).isInstanceOf(KafkaSourceParser.class); + } + + @Test + void shouldParseMinimalKafkaSourceDefinition() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source_minimal.json"); + + // when + final KafkaSource result = cut.unsafeParse(input); + + // then + assertThat(result.aafCredentials()).isNull(); + assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.clientId()).isNull(); + assertThat(result.clientRole()).isNull(); + } + + @Test + void shouldParseFullKafkaSourceDefinition() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source.json"); + + // when + final KafkaSource result = cut.unsafeParse(input); + + // then + final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder() + .username("the user") + .password("the passwd") + .build(); + assertThat(result.aafCredentials()).isEqualTo(expectedCredentials); + assertThat(result.bootstrapServerList()).isEqualTo(List.of("dmaap-mr-kafka-0:6060", "dmaap-mr-kafka-1:6060")); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.consumerGroupId()).isEqualTo("nokia-perf3gpp-processor"); + assertThat(result.clientId()).isEqualTo("1500462518108"); + assertThat(result.clientRole()).isEqualTo("com.dcae.member"); + } + + @Test + void shouldReturnErrorWhenTypeIsWrong() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_invalid_type.json"); + + // when + final Either<StreamParserError, KafkaSource> result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream type"); + assertThat(error.message()).containsIgnoringCase("kafka"); + assertThat(error.message()).containsIgnoringCase("message_router"); + }); + } + + @Test + void shouldReturnErrorWhenDirectionIsWrong() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_source.json"); + + // when + final Either<StreamParserError, KafkaSource> result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream direction"); + }); + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/sample_config.json b/rest-services/cbs-client/src/test/resources/sample_config.json index a95b723f..266326f4 100644 --- a/rest-services/cbs-client/src/test/resources/sample_config.json +++ b/rest-services/cbs-client/src/test/resources/sample_config.json @@ -1,3 +1,33 @@ { - "keystore.path": "/var/run/security/keystore.p12" + "keystore.path": "/var/run/security/keystore.p12", + "streams_publishes": { + "perf3gpp": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_PERF3GPP" + } + }, + "pnf_ready": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_PNF_READY" + } + }, + "call_trace": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_TRACE" + } + } + }, + "streams_subscribes": { + "measurements": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_MEASUREMENT" + } + } + } } diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json b/rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json index 0ee88adb..0ee88adb 100644 --- a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json index b60388d5..e7b45508 100644 --- a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json @@ -1,5 +1,9 @@ { "type": "kafka", + "aaf_credentials": { + "username": "the user", + "password": "the passwd" + }, "kafka_info": { "client_role": "com.dcae.member", "client_id": "1500462518108", diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_source.json b/rest-services/cbs-client/src/test/resources/streams/kafka_source.json new file mode 100644 index 00000000..379dbef1 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_source.json @@ -0,0 +1,14 @@ +{ + "type": "kafka", + "aaf_credentials": { + "username": "the user", + "password": "the passwd" + }, + "kafka_info": { + "client_role": "com.dcae.member", + "client_id": "1500462518108", + "bootstrap_servers": "dmaap-mr-kafka-0:6060,,dmaap-mr-kafka-1:6060,", + "topic_name": "HVVES_PERF3GPP", + "consumer_group_id": "nokia-perf3gpp-processor" + } +} |