diff options
Diffstat (limited to 'rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl')
3 files changed, 338 insertions, 2 deletions
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java index 57bf9b3d..db881a2e 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java @@ -3,7 +3,6 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. - * Copyright (C) 2022 AT&T Intellectual Property. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,6 +95,64 @@ class CbsClientImplIT { server.close(); } + @Test + void testCbsClientWithSingleCall() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result.map(this::sampleConfigValue)) + .expectNext(EXPECTED_CONFIG_VALUE_FROM_CBS) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithPeriodicCall() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Flux<JsonObject> result = sut + .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10))); + + // then + final int itemsToTake = 5; + StepVerifier.create(result.take(itemsToTake).map(this::sampleConfigValue)) + .expectNextSequence(Stream.of(EXPECTED_CONFIG_VALUE_FROM_CBS).cycle(itemsToTake)) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithUpdatesCall() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + final Duration period = Duration.ofMillis(10); + + // when + final Flux<JsonObject> result = sut + .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period)); + + // then + final Duration timeToCollectItemsFor = period.multipliedBy(50); + StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue)) + .expectNext(EXPECTED_CONFIG_VALUE_FROM_CBS) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } @Test void testCbsClientWithConfigRetrievedFromFileMissingEnv() { @@ -134,11 +191,159 @@ class CbsClientImplIT { .verify(Duration.ofSeconds(5)); } + @Test + void testCbsClientWithStreamsParsing() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request)) + .map(json -> + DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .consumeNextWith(kafkaSink -> { + assertThat(kafkaSink.name()).isEqualTo("perf3gpp"); + assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060"); + assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP"); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingUsingSwitch() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + // TODO: Use these parsers below + final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser(); + + // when + final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request)) + .map(json -> { + final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json); + + final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA)) + .map(kafkaSinkParser::unsafeParse); + final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER)) + .map(mrSinkParser::unsafeParse); + + assertThat(allKafkaSinks.size()) + .describedAs("Number of kafka sinks") + .isEqualTo(2); + assertThat(allMrSinks.size()) + .describedAs("Number of DMAAP-MR sinks") + .isEqualTo(1); + + return true; + }) + .then(); + + // then + StepVerifier.create(result) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingWhenUsingInvalidParser() { + // given + envs.set("AAF_USER", "admin"); + envs.set("AAF_PASSWORD", "admin_secret"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request)) + .map(json -> + DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .expectErrorSatisfies(ex -> { + assertThat(ex).isInstanceOf(StreamParsingException.class); + assertThat(ex).hasMessageContaining("Invalid stream type"); + assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString()); + assertThat(ex).hasMessageContaining(KAFKA.toString()); + }) + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithSingleAllRequest() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create()); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result) + .assertNext(json -> { + assertThat(json.get("config")).isNotNull(); + assertThat(json.get("policies")).isNotNull(); + assertThat(json.get("sampleKey")).isNotNull(); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + + @Test + void testCbsClientWithSingleKeyRequest() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleConfigurationCbsSource); + final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey"); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result) + .assertNext(json -> { + assertThat(json.get("key")).isNotNull(); + assertThat(json.get("key").getAsString()).isEqualTo("value"); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWhenTheConfigurationWasNotFound() { + // given + final CbsClientConfiguration unknownAppEnv = ImmutableCbsClientConfiguration.copyOf(sampleConfigurationCbsSource).withAppName("unknown_app"); + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(unknownAppEnv); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result) + .expectError(HttpException.class) + .verify(Duration.ofSeconds(5)); + } @NotNull private static ImmutableCbsClientConfiguration.Builder getConfigBuilder() { return ImmutableCbsClientConfiguration.builder() - .appName("dcae-component"); + .protocol("http") + .appName("dcae-component") + .hostname(server.host()) + .port(server.port()); } private String sampleConfigValue(JsonObject obj) { diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientRestTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientRestTest.java new file mode 100644 index 00000000..6368fbac --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientRestTest.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019-2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; + +import com.google.gson.JsonObject; +import io.vavr.collection.HashMultimap; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import reactor.core.publisher.Mono; +import java.net.InetSocketAddress; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since February 2019 + */ +class CbsClientRestTest { + private final RxHttpClient httpClient = mock(RxHttpClient.class); + + @Test + void shouldFetchUsingProperUrl() { + // given + InetSocketAddress cbsAddress = InetSocketAddress.createUnresolved("cbshost", 6969); + String serviceName = "dcaegen2-ves-collector"; + final CbsClient cut = new CbsClientRest(httpClient, serviceName, cbsAddress, "http"); + final HttpResponse httpResponse = ImmutableHttpResponse.builder() + .url("http://xxx") + .statusCode(200) + .rawBody("{}".getBytes()) + .headers(HashMultimap.withSeq().empty()) + .build(); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse)); + RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + + // when + final JsonObject result = cut.get(CbsRequests.getConfiguration(diagnosticContext)).block(); + + // then + final String expectedUrl = "http://cbshost:6969/service_component/dcaegen2-ves-collector"; + verify(httpClient).call(ImmutableHttpRequest.builder() + .method(HttpMethod.GET) + .url(expectedUrl) + .diagnosticContext(diagnosticContext) + .build()); + assertThat(result.toString()).isEqualTo(httpResponse.bodyAsString()); + } +} diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java new file mode 100644 index 00000000..70f31c8b --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; + +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration; + +import java.net.InetSocketAddress; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since February 2019 + */ +class CbsLookupTest { + + private static final String cbsHostname = "cbs-service"; + private static final int cbsPort = 10000; + private final CbsClientConfiguration configuration = ImmutableCbsClientConfiguration.builder() + .hostname(cbsHostname) + .port(cbsPort) + .appName("whatever").build(); + private final CbsLookup cut = new CbsLookup(); + + @Test + void lookupShouldReturnValidSocketAddressFromEnvironment() { + // when + final InetSocketAddress result = cut.lookup(configuration).block(); + + // then + assertThat(result.getHostString()).isEqualTo(cbsHostname); + assertThat(result.getPort()).isEqualTo(cbsPort); + } +}
\ No newline at end of file |