diff options
15 files changed, 600 insertions, 104 deletions
diff --git a/build/hv-collector-coverage/pom.xml b/build/hv-collector-coverage/pom.xml index 82356c0b..bfde3ae6 100644 --- a/build/hv-collector-coverage/pom.xml +++ b/build/hv-collector-coverage/pom.xml @@ -135,6 +135,11 @@ </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-kafka-consumer</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-main</artifactId> <version>${project.parent.version}</version> </dependency> diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index e243afe7..35adfe79 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -55,7 +55,6 @@ class ConfigurationModule internal constructor(private val configStateListener: CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()) ) - fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args) fun hvVesConfigurationUpdates(args: Array<String>, @@ -67,7 +66,7 @@ class ConfigurationModule internal constructor(private val configStateListener: .doOnNext { logger.info { "Successfully parsed configuration file to: $it" } } .cache() .flatMapMany { basePartialConfig -> - cbsClientAdapter(basePartialConfig).let { cbsClientAdapter -> + cbsClientAdapter(basePartialConfig, mdc).let { cbsClientAdapter -> cbsConfigurationProvider(cbsClientAdapter, mdc) .invoke() .map { configMerger.merge(basePartialConfig, it) } @@ -75,27 +74,28 @@ class ConfigurationModule internal constructor(private val configStateListener: .throwOnLeft() .map(configTransformer::toFinalConfiguration) .doOnNext { - cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc) + cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval) } } } - private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) = - CbsClientAdapter( - cbsClient, - configStateListener, - cbsConfigurationFrom(basePartialConfig).firstRequestDelay, - retrySpec - ) + private fun cbsClientAdapter(basePartialConfig: PartialConfiguration, + mdc: MappedDiagnosticContext) = CbsClientAdapter( + cbsClient, + cbsConfigurationFrom(basePartialConfig).firstRequestDelay, + configStateListener, + mdc, + infiniteRetry + ) private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter, - mdc: MappedDiagnosticContext) = - CbsConfigurationProvider( - cbsClientAdapter, - configParser, - configStateListener, - mdc, - retrySpec) + mdc: MappedDiagnosticContext) = CbsConfigurationProvider( + cbsClientAdapter, + configParser, + configStateListener, + mdc, + infiniteRetry + ) private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator.validatedCbsConfiguration(basePartialConfig) @@ -104,11 +104,11 @@ class ConfigurationModule internal constructor(private val configStateListener: companion object { private val logger = Logger(ConfigurationModule::class) - private const val MAX_RETRIES = 5L - private const val INITIAL_BACKOFF = 10L - private val retrySpec: Retry<Any> = Retry.any<Any>() - .retryMax(MAX_RETRIES) - .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF)) + private val FIRST_BACKOFF_DURATION = Duration.ofSeconds(5) + private val MAX_BACKOFF_DURATION = Duration.ofMinutes(5) + private val infiniteRetry: Retry<Any> = Retry.any<Any>() + .retryMax(Long.MAX_VALUE) + .exponentialBackoff(FIRST_BACKOFF_DURATION, MAX_BACKOFF_DURATION) .jitter(Jitter.random()) } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt index d31f6585..8b7ed67f 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.config.impl import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext -import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog import org.onap.dcae.collectors.veshv.utils.rx.delayElements import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests @@ -35,26 +34,31 @@ import java.util.concurrent.atomic.AtomicReference internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>, - private val configurationStateListener: ConfigurationStateListener, private val firstRequestDelay: Duration, - private val retrySpec: Retry<Any>) { + private val configurationStateListener: ConfigurationStateListener, + private val mdc: MappedDiagnosticContext, + retrySpec: Retry<Any>) { private val requestInterval = AtomicReference<Duration>(Duration.ZERO) + private val retry = retrySpec.doOnRetry { + logger.withWarn(mdc) { + log("Exception while creating CBS client, retrying. Reason: ${it.exception().localizedMessage}") + } + configurationStateListener.retrying() + } - fun configurationUpdates(mdc: MappedDiagnosticContext) = cbsClientMono + fun configurationUpdates() = cbsClientMono .doOnNext { logger.info(mdc) { "CBS client successfully created, first request will be sent in ${firstRequestDelay.seconds} s" } } - .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" } - .retryWhen(retry(mdc)) + .retryWhen(retry) .delayElement(firstRequestDelay) .flatMapMany(::toPeriodicalConfigurations) .distinctUntilChanged() - fun updateCbsInterval(intervalUpdate: Duration, mdc: MappedDiagnosticContext) { - requestInterval.set(intervalUpdate) + fun updateCbsInterval(intervalUpdate: Duration) = requestInterval.set(intervalUpdate).also { logger.debug(mdc) { "CBS request interval changed to: ${intervalUpdate.seconds} s" } } @@ -67,15 +71,7 @@ internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>, private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create()) - private fun retry(mdc: MappedDiagnosticContext) = retrySpec.doOnRetry { - logger.withWarn(mdc) { - log("Exception from HV-VES cbs client, retrying subscription", it.exception()) - } - configurationStateListener.retrying() - } - companion object { private val logger = Logger(CbsClientAdapter::class) } - } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt index 6efa38e6..6f16b3d1 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt @@ -55,7 +55,7 @@ internal class CbsConfigurationProvider(private val cbsClientAdapter: CbsClientA } operator fun invoke(): Flux<PartialConfiguration> = - cbsClientAdapter.configurationUpdates(mdc) + cbsClientAdapter.configurationUpdates() .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } } .map(::parseConfiguration) .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } } diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt index 1b2dbc2b..9303920e 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt @@ -19,41 +19,109 @@ */ package org.onap.dcae.collectors.veshv.config.api -import arrow.core.Option -import com.google.gson.JsonParser +import arrow.core.Some import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.reset +import com.nhaarman.mockitokotlin2.times +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.* -import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration +import org.onap.dcae.collectors.veshv.config.impl.mdc import org.onap.dcae.collectors.veshv.tests.utils.absoluteResourcePath -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import reactor.core.publisher.Mono import reactor.test.StepVerifier import java.time.Duration internal object ConfigurationModuleIT : Spek({ - describe("configuration module") { - val cbsClientMock = mock<CbsClient>() - val configStateListenerMock = mock<ConfigurationStateListener>() - val sut = ConfigurationModule(configStateListenerMock, Mono.just(cbsClientMock)) - val configPath = javaClass.absoluteResourcePath("/insecureSampleConfig.json") - - given("sample configuration in file: $configPath") { - val arguments = arrayOf( - "--configuration-file", - configPath, - "--health-check-api-port", - "6062") + StepVerifier.setDefaultTimeout(Duration.ofSeconds(5)) + + describe("Configuration Module") { + val configStateListenerMock: ConfigurationStateListener = mock() + val cbsClientMono = Mono.fromSupplier(CbsClientMockSupplier) + + val sut = ConfigurationModule(configStateListenerMock, cbsClientMono) + + beforeEachTest { + reset(configStateListenerMock) + CbsClientMockSupplier.reset() + } + + given("sample configuration in file") { + val configurationPath = javaClass.absoluteResourcePath("/insecureSampleConfig.json") + + val configurationUpdates = sut.hvVesConfigurationUpdates(arguments(configurationPath), mdc) + + on("Config Binding Service permanently not available") { + CbsClientMockSupplier.setCbsClientCreationSuccessful(false) + val testVirtualDuration = Duration.ofMinutes(10) + + it("should retry as long as possible until failing") { + StepVerifier + .withVirtualTime { configurationUpdates.last() } + .expectSubscription() + .expectNoEvent(testVirtualDuration) + .thenCancel() + .verifyThenAssertThat() + .allOperatorErrorsAre(CbsClientMockSupplier.throwedException()) + } + + it("should notify configuration state listener about each retry") { + val requestsAmount = CbsClientMockSupplier.requestsAmount.get() + assertThat(requestsAmount).describedAs("CBS client requests amount").isGreaterThan(0) + verify(configStateListenerMock, times(requestsAmount)).retrying() + } + } + + on("Config Binding Service temporarily not available") { + CbsClientMockSupplier.setCbsClientCreationSuccessful(false) + val cbsUnavailabilityTime = Duration.ofMinutes(10) + whenever(CbsClientMockSupplier.cbsClientMock.get(any())) + .thenReturn(Mono.just(configurationJsonWithIntervalChanged)) + + it("should return configuration after CBS is available again") { + StepVerifier + .withVirtualTime { configurationUpdates.take(1) } + .expectSubscription() + .expectNoEvent(cbsUnavailabilityTime) + .then { CbsClientMockSupplier.setCbsClientCreationSuccessful(true) } + .thenAwait(MAX_BACKOFF_INTERVAL) + .expectNext(configurationWithIntervalChanged) + .verifyComplete() + } + } + + on("failure from CBS client during getting configuration") { + val exceptionFromCbsClient = MyCustomTestCbsClientException("I'm such a failure") + whenever(CbsClientMockSupplier.cbsClientMock.get(any())) + .thenReturn(Mono.error(exceptionFromCbsClient)) + val testVirtualDuration = Duration.ofMinutes(2) + + it("should retry as long as possible until failing") { + StepVerifier + .withVirtualTime { configurationUpdates.last() } + .expectSubscription() + .expectNoEvent(testVirtualDuration) + .thenCancel() + .verifyThenAssertThat() + .allOperatorErrorsAre(exceptionFromCbsClient) + } + + it("should notify configuration state listener about each retry") { + val requestsAmount = CbsClientMockSupplier.requestsAmount.get() + assertThat(requestsAmount).describedAs("CBS client requests amount").isGreaterThan(0) + verify(configStateListenerMock, times(requestsAmount)).retrying() + } + } + on("configuration changes in Config Binding Service") { - whenever(cbsClientMock.get(any())) + whenever(CbsClientMockSupplier.cbsClientMock.get(any())) .thenReturn( Mono.just(configurationJsonWithIntervalChanged), Mono.just(configurationJsonWithIntervalChangedAgain), @@ -62,10 +130,7 @@ internal object ConfigurationModuleIT : Spek({ it("should wait $firstRequestDelayFromFile s as provided in configuration file and later" + " fetch configurations in intervals specified within them") { StepVerifier - .withVirtualTime { - sut.hvVesConfigurationUpdates(arguments, sampleMdc) - .take(3) - } + .withVirtualTime { configurationUpdates.take(3) } .expectSubscription() .expectNoEvent(firstRequestDelayFromFile) .expectNext(configurationWithIntervalChanged) @@ -80,26 +145,34 @@ internal object ConfigurationModuleIT : Spek({ } }) +private data class MyCustomTestCbsClientException(val msg: String) : Exception(msg) + +private val MAX_BACKOFF_INTERVAL = Duration.ofMinutes(5) + +fun StepVerifier.Assertions.allOperatorErrorsAre(ex: Throwable) = hasOperatorErrorsMatching { + it.all { tuple -> tuple.t1.get() === ex } +} + +private fun arguments(configurationPath: String) = arrayOf( + "--configuration-file", + configurationPath, + "--health-check-api-port", + "6062") + private val firstRequestDelayFromFile = Duration.ofSeconds(3) private val firstRequestDelayFromCBS = Duration.ofSeconds(999) private val requestIntervalFromCBS = Duration.ofSeconds(10) private val anotherRequestIntervalFromCBS = Duration.ofSeconds(20) -private val sampleMdc = { mapOf("k" to "v") } -private val emptyRouting = listOf<Route>() +private val configurationJsonWithIntervalChanged = + hvVesConfigurationJson(requestInterval = Some(requestIntervalFromCBS)) -private val configurationJsonWithIntervalChanged = JsonParser().parse("""{ - "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds} -}""").asJsonObject +private val configurationJsonWithIntervalChangedAgain = + hvVesConfigurationJson(requestInterval = Some(anotherRequestIntervalFromCBS), + firstRequestDelay = Some(firstRequestDelayFromCBS)) -private val configurationJsonWithIntervalChangedAgain = JsonParser().parse("""{ - "cbs.firstRequestDelaySec": ${firstRequestDelayFromCBS.seconds}, - "cbs.requestIntervalSec": ${anotherRequestIntervalFromCBS.seconds} -}""").asJsonObject - -private val configurationJsonWithIntervalRestored = JsonParser().parse("""{ - "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds} -}""").asJsonObject +private val configurationJsonWithIntervalRestored = + hvVesConfigurationJson(requestInterval = Some(requestIntervalFromCBS)) private val configurationWithIntervalChanged = hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS) @@ -110,11 +183,4 @@ private val configurationWithIntervalChangedAgain = private val configurationWithIntervalRestored = hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS) -private fun hvVesConfiguration(firstRequestDelay: Duration, requestInterval: Duration): HvVesConfiguration { - return HvVesConfiguration( - ServerConfiguration(6061, Duration.ofSeconds(60)), - CbsConfiguration(firstRequestDelay, requestInterval), - SecurityConfiguration(Option.empty()), - CollectorConfiguration(emptyRouting, 1024 * 1024), - LogLevel.DEBUG) -}
\ No newline at end of file + diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/cbs_stub.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/cbs_stub.kt new file mode 100644 index 00000000..2491264e --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/cbs_stub.kt @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api + +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.reset +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import java.util.concurrent.atomic.AtomicInteger +import java.util.function.Supplier + + +internal object CbsClientMockSupplier : Supplier<CbsClient> { + + private val logger = Logger(CbsClientMockSupplier::class) + private val cbsClientSupplierException = Exception("Test was configured to fail at client creation.") + + private var shouldEmitError = false + val requestsAmount = AtomicInteger(0) + val cbsClientMock: CbsClient = mock() + + override fun get(): CbsClient = requestsAmount.incrementAndGet().let { + if (shouldEmitError) { + throw cbsClientSupplierException + } else { + cbsClientMock + } + } + + fun setCbsClientCreationSuccessful(creationSuccessful: Boolean) { + logger.trace { "Setting CBS creation success result to : $creationSuccessful" } + shouldEmitError = !creationSuccessful + } + + fun throwedException(): Throwable = cbsClientSupplierException + + fun reset() { + reset(cbsClientMock) + setCbsClientCreationSuccessful(true) + this.requestsAmount.set(0) + } +} diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/test_configurations.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/test_configurations.kt new file mode 100644 index 00000000..8472f3c9 --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/test_configurations.kt @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api + +import arrow.core.None +import arrow.core.Option +import arrow.core.getOrElse +import com.google.gson.JsonParser +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import java.time.Duration + + +internal fun hvVesConfigurationJson(listenPort: Option<Int> = None, + idleTimeoutSec: Option<Int> = None, + firstRequestDelay: Option<Duration> = None, + requestInterval: Option<Duration> = None, + logLevel: Option<String> = None, + sslDisable: Option<Boolean> = None, + keyStoreFilePath: Option<String> = None, + keyStorePasswordFilePath: Option<String> = None, + trustStoreFilePath: Option<String> = None, + trustStorePasswordFilePath: Option<String> = None) = JsonParser().parse( + """{ + ${addKeyIfPresent("logLevel", logLevel)} + ${addKeyIfPresent("server.listenPort", listenPort)} + ${addKeyIfPresent("server.idleTimeoutSec", idleTimeoutSec)} + ${addKeyIfPresent("cbs.firstRequestDelaySec", firstRequestDelay.map { it.seconds })} + ${addKeyIfPresent("cbs.requestIntervalSec", requestInterval.map { it.seconds })} + ${addKeyIfPresent("security.sslDisable", sslDisable)} + ${addKeyIfPresent("security.keys.keyStoreFile", keyStoreFilePath)} + ${addKeyIfPresent("security.keys.keyStorePasswordFile", keyStorePasswordFilePath)} + ${addKeyIfPresent("security.keys.trustStoreFile", trustStoreFilePath)} + ${addKeyIfPresent("security.keys.trustStorePasswordFile", trustStorePasswordFilePath)} +""".trim().removeSuffix(",") + "}" +).asJsonObject + +private fun <T> addKeyIfPresent(configurationKey: String, option: Option<T>) = option + .map { "$configurationKey: $it," } + .getOrElse { "" } + + +private val emptyRouting = listOf<Route>() + +internal fun hvVesConfiguration(firstRequestDelay: Duration, requestInterval: Duration) = + HvVesConfiguration( + ServerConfiguration(6061, Duration.ofSeconds(60)), + CbsConfiguration(firstRequestDelay, requestInterval), + SecurityConfiguration(Option.empty()), + CollectorConfiguration(emptyRouting, 1024 * 1024), + LogLevel.DEBUG) + + diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapterTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapterTest.kt new file mode 100644 index 00000000..1f6a2538 --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapterTest.kt @@ -0,0 +1,127 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import com.google.gson.JsonParser +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.times +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import reactor.core.publisher.Mono +import reactor.test.StepVerifier +import java.time.Duration + +internal object CbsClientAdapterTest : Spek({ + + describe("Config Binding Service Client Adapter") { + + val cbsClientMock: CbsClient = mock() + val configStateListener: ConfigurationStateListener = mock() + + given("successful client creation") { + val cbsClientMono = Mono.just(cbsClientMock) + val cut = CbsClientAdapter(cbsClientMono, firstRequestDelay, configStateListener, mdc, retry()) + + on("configurations stream in CBS") { + val firstConfigurationContent = "first" + val secondConfigurationContent = "second" + whenever(cbsClientMock.get(any())).thenReturn( + configurationMono(firstConfigurationContent), + configurationMono(secondConfigurationContent) + ) + + it("should return flux of fetched configurations") { + StepVerifier + .withVirtualTime { + cut.configurationUpdates().take(2) + } + .expectSubscription() + .expectNoEvent(firstRequestDelay) + .expectNext(configuration(firstConfigurationContent)) + .expectNext(configuration(secondConfigurationContent)) + .verifyComplete() + } + } + + + on("exception from CBS client on configuration fetch") { + + whenever(cbsClientMock.get(any())).thenReturn( + Mono.error { sampleException } + ) + + it("should return error flux") { + StepVerifier.create(cut.configurationUpdates()) + .expectErrorMatches { it === sampleException } + .verify() + } + } + } + + given("repeated failure during client creation") { + val failedCreationsAmount = 3 + var currentFailuresCount = 0 + val cbsClientMono = Mono.fromCallable { + currentFailuresCount++ + if (currentFailuresCount <= failedCreationsAmount) { + throw sampleException + } else { + cbsClientMock + } + } + + val cut = CbsClientAdapter(cbsClientMono, firstRequestDelay, configStateListener, mdc, + retry(failedCreationsAmount + 1L)) + + on("CBS client creation") { + whenever(cbsClientMock.get(any())).thenReturn(configurationMono()) + + it("it should emit configuration after failures") { + StepVerifier + .withVirtualTime { cut.configurationUpdates().take(1) } + .expectSubscription() + .expectNoEvent(firstRequestDelay) + .expectNext(configuration()) + .verifyComplete() + } + + it("should call state listener when retrying") { + verify(configStateListener, times(failedCreationsAmount)).retrying() + } + } + } + } +}) + +private val firstRequestDelay = Duration.ofSeconds(10) +private val sampleException = Exception("Best regards from CBS") + +private fun configuration(content: String = "whatever") = + JsonParser().parse("""{ "content": ${content} }""").asJsonObject + +private fun configurationMono(content: String = "whatever") = Mono.just(configuration(content)) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt index 31415454..0954b76e 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt @@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Some import com.google.gson.JsonParser -import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.times import com.nhaarman.mockitokotlin2.verify @@ -49,11 +48,11 @@ internal object CbsConfigurationProviderTest : Spek({ describe("Configuration provider") { - val cbsClientAdapter = mock<CbsClientAdapter>() - val configStateListener = mock<ConfigurationStateListener>() + val cbsClientAdapter: CbsClientAdapter = mock() + val configStateListener: ConfigurationStateListener = mock() given("configuration is never in cbs") { - val cbsClientMock = mock<CbsClient>() + val cbsClientMock: CbsClient = mock() val configProvider = constructConfigurationProvider( constructCbsClientAdapter(cbsClientMock, configStateListener), configStateListener @@ -73,7 +72,7 @@ internal object CbsConfigurationProviderTest : Spek({ val configProvider = constructConfigurationProvider(cbsClientAdapter, configStateListener) on("new configuration") { - whenever(cbsClientAdapter.configurationUpdates(any())) + whenever(cbsClientAdapter.configurationUpdates()) .thenReturn(Flux.just(validConfiguration)) it("should use received configuration") { @@ -110,7 +109,7 @@ internal object CbsConfigurationProviderTest : Spek({ ) on("new configuration") { - whenever(cbsClientAdapter.configurationUpdates(any())) + whenever(cbsClientAdapter.configurationUpdates()) .thenReturn(Flux.just(invalidConfiguration)) it("should interrupt the flux") { @@ -193,21 +192,15 @@ private val invalidConfiguration = JsonParser().parse(""" private val firstRequestDelay = Duration.ofMillis(1) private val configParser = JsonConfigurationParser() -private fun retry(iterationCount: Long = 1) = Retry - .onlyIf<Any> { it.iteration() <= iterationCount } - .fixedBackoff(Duration.ofNanos(1)) - private fun constructCbsClientAdapter(cbsClientMock: CbsClient, configStateListener: ConfigurationStateListener) = - CbsClientAdapter(Mono.just(cbsClientMock), configStateListener, firstRequestDelay, retry()) + CbsClientAdapter(Mono.just(cbsClientMock), firstRequestDelay, configStateListener, mdc, retry()) private fun constructConfigurationProvider(cbsClientAdapter: CbsClientAdapter, configurationStateListener: ConfigurationStateListener, - iterationCount: Long = 1 -): CbsConfigurationProvider = - CbsConfigurationProvider( - cbsClientAdapter, - configParser, - configurationStateListener, - { mapOf("k" to "v") }, - retry(iterationCount) - ) + iterationCount: Long = 1) = CbsConfigurationProvider( + cbsClientAdapter, + configParser, + configurationStateListener, + mdc, + retry(iterationCount) +) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt index f07af079..d2b56b66 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt @@ -24,7 +24,9 @@ import com.nhaarman.mockitokotlin2.whenever import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import reactor.retry.Retry import java.nio.file.Paths +import java.time.Duration private fun resourcePathAsString(resource: String) = Paths.get(ConfigurationValidatorTest::class.java.getResource(resource).toURI()).toString() @@ -51,4 +53,11 @@ private val sampleSink = mock<KafkaSink>().also { } internal val sampleStreamsDefinition = listOf(sampleSink) -internal val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
\ No newline at end of file +internal val sampleRouting = listOf(Route(sampleSink.name(), sampleSink)) + +internal val mdc = { mapOf("mdc_key" to "mdc_value") } + +internal fun retry(iterationCount: Long = 1) = Retry + .onlyIf<Any> { it.iteration() <= iterationCount } + .fixedBackoff(Duration.ofNanos(1)) + diff --git a/sources/hv-collector-kafka-consumer/Dockerfile b/sources/hv-collector-kafka-consumer/Dockerfile new file mode 100644 index 00000000..aed9680c --- /dev/null +++ b/sources/hv-collector-kafka-consumer/Dockerfile @@ -0,0 +1,18 @@ +FROM docker.io/openjdk:11-jre-slim + +LABEL copyright="Copyright (C) 2019 NOKIA" +LABEL license.name="The Apache Software License, Version 2.0" +LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0" +LABEL maintainer="Nokia Wroclaw ONAP Team" + +RUN apt-get update \ + && apt-get install -y --no-install-recommends curl \ + && apt-get clean + +WORKDIR /opt/hv-ves-kafka-consumer + +ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.kafkaconsumer.MainKt"] + +COPY target/libs/external/* ./ +COPY target/libs/internal/* ./ +COPY target/hv-collector-kafka-consumer-*.jar ./ diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml new file mode 100644 index 00000000..45a32729 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/pom.xml @@ -0,0 +1,95 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <parent> + <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> + <artifactId>hv-collector-sources</artifactId> + <version>1.2.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + + <artifactId>hv-collector-kafka-consumer</artifactId> + + <description>VES HighVolume Collector :: Kafka consumer</description> + + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>docker</id> + <activation> + <property> + <name>!skipDocker</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + <plugin> + <groupId>io.fabric8</groupId> + <artifactId>docker-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-commandline</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-test-utils</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-jdk8</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>runtime</scope> + </dependency> + + </dependencies> +</project> diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt new file mode 100644 index 00000000..fa15587c --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt @@ -0,0 +1,22 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer + +fun main(args: Array<String>) = println("Guten tag") diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt new file mode 100644 index 00000000..b7ea126f --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer + +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import kotlin.test.assertTrue + +object SampleTest : Spek({ + describe("sample test") { + assertTrue(true) + } +}) diff --git a/sources/pom.xml b/sources/pom.xml index 81bf3017..c7ba4886 100644 --- a/sources/pom.xml +++ b/sources/pom.xml @@ -142,6 +142,7 @@ <module>hv-collector-dcae-app-simulator</module> <module>hv-collector-domain</module> <module>hv-collector-health-check</module> + <module>hv-collector-kafka-consumer</module> <module>hv-collector-main</module> <module>hv-collector-server</module> <module>hv-collector-ssl</module> |