diff options
author | Vijay Venkatesh Kumar <vv770d@att.com> | 2022-10-06 15:42:01 +0000 |
---|---|---|
committer | Vijay Venkatesh Kumar <vv770d@att.com> | 2022-10-06 15:43:31 +0000 |
commit | 16ad4df7c231bf9f49df0b2060e156f58f84bd75 (patch) | |
tree | 48d7a02f90069ea3e331f3a304769356c005c8d1 /rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java | |
parent | 446e7a61af43f6ef1f1b9429201834ee86b60c4b (diff) |
Revert "Remove CBS and Consul dependencies from CBSClient lib"kohn
This reverts commit 592f371519d0647b7ae9a0a2fc601c757133cf0d.
This commit belongs to London/master branch only
Change-Id: Ie92be8fa8eda72462d7411646bbef6b52760d29a
Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com>
Issue-ID: DCAEGEN2-3236
Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com>
Diffstat (limited to 'rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java')
-rw-r--r-- | rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java | 209 |
1 files changed, 207 insertions, 2 deletions
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java index 57bf9b3d..db881a2e 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java @@ -3,7 +3,6 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. - * Copyright (C) 2022 AT&T Intellectual Property. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,6 +95,64 @@ class CbsClientImplIT { server.close(); } + @Test + void testCbsClientWithSingleCall() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result.map(this::sampleConfigValue)) + .expectNext(EXPECTED_CONFIG_VALUE_FROM_CBS) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithPeriodicCall() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Flux<JsonObject> result = sut + .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10))); + + // then + final int itemsToTake = 5; + StepVerifier.create(result.take(itemsToTake).map(this::sampleConfigValue)) + .expectNextSequence(Stream.of(EXPECTED_CONFIG_VALUE_FROM_CBS).cycle(itemsToTake)) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithUpdatesCall() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + final Duration period = Duration.ofMillis(10); + + // when + final Flux<JsonObject> result = sut + .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period)); + + // then + final Duration timeToCollectItemsFor = period.multipliedBy(50); + StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue)) + .expectNext(EXPECTED_CONFIG_VALUE_FROM_CBS) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } @Test void testCbsClientWithConfigRetrievedFromFileMissingEnv() { @@ -134,11 +191,159 @@ class CbsClientImplIT { .verify(Duration.ofSeconds(5)); } + @Test + void testCbsClientWithStreamsParsing() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request)) + .map(json -> + DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .consumeNextWith(kafkaSink -> { + assertThat(kafkaSink.name()).isEqualTo("perf3gpp"); + assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060"); + assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP"); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingUsingSwitch() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + // TODO: Use these parsers below + final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser(); + + // when + final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request)) + .map(json -> { + final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json); + + final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA)) + .map(kafkaSinkParser::unsafeParse); + final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER)) + .map(mrSinkParser::unsafeParse); + + assertThat(allKafkaSinks.size()) + .describedAs("Number of kafka sinks") + .isEqualTo(2); + assertThat(allMrSinks.size()) + .describedAs("Number of DMAAP-MR sinks") + .isEqualTo(1); + + return true; + }) + .then(); + + // then + StepVerifier.create(result) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingWhenUsingInvalidParser() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request)) + .map(json -> + DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .expectErrorSatisfies(ex -> { + assertThat(ex).isInstanceOf(StreamParsingException.class); + assertThat(ex).hasMessageContaining("Invalid stream type"); + assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString()); + assertThat(ex).hasMessageContaining(KAFKA.toString()); + }) + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithSingleAllRequest() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create()); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result) + .assertNext(json -> { + assertThat(json.get("config")).isNotNull(); + assertThat(json.get("policies")).isNotNull(); + assertThat(json.get("sampleKey")).isNotNull(); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + + @Test + void testCbsClientWithSingleKeyRequest() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey"); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result) + .assertNext(json -> { + assertThat(json.get("key")).isNotNull(); + assertThat(json.get("key").getAsString()).isEqualTo("value"); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWhenTheConfigurationWasNotFound() { + // given + final CbsClientConfiguration unknownAppEnv = ImmutableCbsClientConfiguration.copyOf(sampleConfigurationCbsSource).withAppName("unknown_app"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(unknownAppEnv); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result) + .expectError(HttpException.class) + .verify(Duration.ofSeconds(5)); + } @NotNull private static ImmutableCbsClientConfiguration.Builder getConfigBuilder() { return ImmutableCbsClientConfiguration.builder() - .appName("dcae-component"); + .protocol("http") + .appName("dcae-component") + .hostname(server.host()) + .port(server.port()); } private String sampleConfigValue(JsonObject obj) { |