aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--build/hv-collector-coverage/pom.xml5
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt44
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt28
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt2
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt154
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/cbs_stub.kt59
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/test_configurations.kt75
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapterTest.kt127
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt33
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt11
-rw-r--r--sources/hv-collector-kafka-consumer/Dockerfile18
-rw-r--r--sources/hv-collector-kafka-consumer/pom.xml95
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt22
-rw-r--r--sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt30
-rw-r--r--sources/pom.xml1
15 files changed, 600 insertions, 104 deletions
diff --git a/build/hv-collector-coverage/pom.xml b/build/hv-collector-coverage/pom.xml
index 82356c0b..bfde3ae6 100644
--- a/build/hv-collector-coverage/pom.xml
+++ b/build/hv-collector-coverage/pom.xml
@@ -135,6 +135,11 @@
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-kafka-consumer</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-main</artifactId>
<version>${project.parent.version}</version>
</dependency>
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index e243afe7..35adfe79 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -55,7 +55,6 @@ class ConfigurationModule internal constructor(private val configStateListener:
CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment())
)
-
fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
fun hvVesConfigurationUpdates(args: Array<String>,
@@ -67,7 +66,7 @@ class ConfigurationModule internal constructor(private val configStateListener:
.doOnNext { logger.info { "Successfully parsed configuration file to: $it" } }
.cache()
.flatMapMany { basePartialConfig ->
- cbsClientAdapter(basePartialConfig).let { cbsClientAdapter ->
+ cbsClientAdapter(basePartialConfig, mdc).let { cbsClientAdapter ->
cbsConfigurationProvider(cbsClientAdapter, mdc)
.invoke()
.map { configMerger.merge(basePartialConfig, it) }
@@ -75,27 +74,28 @@ class ConfigurationModule internal constructor(private val configStateListener:
.throwOnLeft()
.map(configTransformer::toFinalConfiguration)
.doOnNext {
- cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc)
+ cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval)
}
}
}
- private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) =
- CbsClientAdapter(
- cbsClient,
- configStateListener,
- cbsConfigurationFrom(basePartialConfig).firstRequestDelay,
- retrySpec
- )
+ private fun cbsClientAdapter(basePartialConfig: PartialConfiguration,
+ mdc: MappedDiagnosticContext) = CbsClientAdapter(
+ cbsClient,
+ cbsConfigurationFrom(basePartialConfig).firstRequestDelay,
+ configStateListener,
+ mdc,
+ infiniteRetry
+ )
private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
- mdc: MappedDiagnosticContext) =
- CbsConfigurationProvider(
- cbsClientAdapter,
- configParser,
- configStateListener,
- mdc,
- retrySpec)
+ mdc: MappedDiagnosticContext) = CbsConfigurationProvider(
+ cbsClientAdapter,
+ configParser,
+ configStateListener,
+ mdc,
+ infiniteRetry
+ )
private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
configValidator.validatedCbsConfiguration(basePartialConfig)
@@ -104,11 +104,11 @@ class ConfigurationModule internal constructor(private val configStateListener:
companion object {
private val logger = Logger(ConfigurationModule::class)
- private const val MAX_RETRIES = 5L
- private const val INITIAL_BACKOFF = 10L
- private val retrySpec: Retry<Any> = Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF))
+ private val FIRST_BACKOFF_DURATION = Duration.ofSeconds(5)
+ private val MAX_BACKOFF_DURATION = Duration.ofMinutes(5)
+ private val infiniteRetry: Retry<Any> = Retry.any<Any>()
+ .retryMax(Long.MAX_VALUE)
+ .exponentialBackoff(FIRST_BACKOFF_DURATION, MAX_BACKOFF_DURATION)
.jitter(Jitter.random())
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
index d31f6585..8b7ed67f 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.config.impl
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
-import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
import org.onap.dcae.collectors.veshv.utils.rx.delayElements
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
@@ -35,26 +34,31 @@ import java.util.concurrent.atomic.AtomicReference
internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
- private val configurationStateListener: ConfigurationStateListener,
private val firstRequestDelay: Duration,
- private val retrySpec: Retry<Any>) {
+ private val configurationStateListener: ConfigurationStateListener,
+ private val mdc: MappedDiagnosticContext,
+ retrySpec: Retry<Any>) {
private val requestInterval = AtomicReference<Duration>(Duration.ZERO)
+ private val retry = retrySpec.doOnRetry {
+ logger.withWarn(mdc) {
+ log("Exception while creating CBS client, retrying. Reason: ${it.exception().localizedMessage}")
+ }
+ configurationStateListener.retrying()
+ }
- fun configurationUpdates(mdc: MappedDiagnosticContext) = cbsClientMono
+ fun configurationUpdates() = cbsClientMono
.doOnNext {
logger.info(mdc) {
"CBS client successfully created, first request will be sent in ${firstRequestDelay.seconds} s"
}
}
- .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
- .retryWhen(retry(mdc))
+ .retryWhen(retry)
.delayElement(firstRequestDelay)
.flatMapMany(::toPeriodicalConfigurations)
.distinctUntilChanged()
- fun updateCbsInterval(intervalUpdate: Duration, mdc: MappedDiagnosticContext) {
- requestInterval.set(intervalUpdate)
+ fun updateCbsInterval(intervalUpdate: Duration) = requestInterval.set(intervalUpdate).also {
logger.debug(mdc) { "CBS request interval changed to: ${intervalUpdate.seconds} s" }
}
@@ -67,15 +71,7 @@ internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
- private fun retry(mdc: MappedDiagnosticContext) = retrySpec.doOnRetry {
- logger.withWarn(mdc) {
- log("Exception from HV-VES cbs client, retrying subscription", it.exception())
- }
- configurationStateListener.retrying()
- }
-
companion object {
private val logger = Logger(CbsClientAdapter::class)
}
-
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
index 6efa38e6..6f16b3d1 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -55,7 +55,7 @@ internal class CbsConfigurationProvider(private val cbsClientAdapter: CbsClientA
}
operator fun invoke(): Flux<PartialConfiguration> =
- cbsClientAdapter.configurationUpdates(mdc)
+ cbsClientAdapter.configurationUpdates()
.doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
.map(::parseConfiguration)
.doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt
index 1b2dbc2b..9303920e 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt
@@ -19,41 +19,109 @@
*/
package org.onap.dcae.collectors.veshv.config.api
-import arrow.core.Option
-import com.google.gson.JsonParser
+import arrow.core.Some
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.reset
+import com.nhaarman.mockitokotlin2.times
+import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.*
-import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.config.impl.mdc
import org.onap.dcae.collectors.veshv.tests.utils.absoluteResourcePath
-import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import java.time.Duration
internal object ConfigurationModuleIT : Spek({
- describe("configuration module") {
- val cbsClientMock = mock<CbsClient>()
- val configStateListenerMock = mock<ConfigurationStateListener>()
- val sut = ConfigurationModule(configStateListenerMock, Mono.just(cbsClientMock))
- val configPath = javaClass.absoluteResourcePath("/insecureSampleConfig.json")
-
- given("sample configuration in file: $configPath") {
- val arguments = arrayOf(
- "--configuration-file",
- configPath,
- "--health-check-api-port",
- "6062")
+ StepVerifier.setDefaultTimeout(Duration.ofSeconds(5))
+
+ describe("Configuration Module") {
+ val configStateListenerMock: ConfigurationStateListener = mock()
+ val cbsClientMono = Mono.fromSupplier(CbsClientMockSupplier)
+
+ val sut = ConfigurationModule(configStateListenerMock, cbsClientMono)
+
+ beforeEachTest {
+ reset(configStateListenerMock)
+ CbsClientMockSupplier.reset()
+ }
+
+ given("sample configuration in file") {
+ val configurationPath = javaClass.absoluteResourcePath("/insecureSampleConfig.json")
+
+ val configurationUpdates = sut.hvVesConfigurationUpdates(arguments(configurationPath), mdc)
+
+ on("Config Binding Service permanently not available") {
+ CbsClientMockSupplier.setCbsClientCreationSuccessful(false)
+ val testVirtualDuration = Duration.ofMinutes(10)
+
+ it("should retry as long as possible until failing") {
+ StepVerifier
+ .withVirtualTime { configurationUpdates.last() }
+ .expectSubscription()
+ .expectNoEvent(testVirtualDuration)
+ .thenCancel()
+ .verifyThenAssertThat()
+ .allOperatorErrorsAre(CbsClientMockSupplier.throwedException())
+ }
+
+ it("should notify configuration state listener about each retry") {
+ val requestsAmount = CbsClientMockSupplier.requestsAmount.get()
+ assertThat(requestsAmount).describedAs("CBS client requests amount").isGreaterThan(0)
+ verify(configStateListenerMock, times(requestsAmount)).retrying()
+ }
+ }
+
+ on("Config Binding Service temporarily not available") {
+ CbsClientMockSupplier.setCbsClientCreationSuccessful(false)
+ val cbsUnavailabilityTime = Duration.ofMinutes(10)
+ whenever(CbsClientMockSupplier.cbsClientMock.get(any()))
+ .thenReturn(Mono.just(configurationJsonWithIntervalChanged))
+
+ it("should return configuration after CBS is available again") {
+ StepVerifier
+ .withVirtualTime { configurationUpdates.take(1) }
+ .expectSubscription()
+ .expectNoEvent(cbsUnavailabilityTime)
+ .then { CbsClientMockSupplier.setCbsClientCreationSuccessful(true) }
+ .thenAwait(MAX_BACKOFF_INTERVAL)
+ .expectNext(configurationWithIntervalChanged)
+ .verifyComplete()
+ }
+ }
+
+ on("failure from CBS client during getting configuration") {
+ val exceptionFromCbsClient = MyCustomTestCbsClientException("I'm such a failure")
+ whenever(CbsClientMockSupplier.cbsClientMock.get(any()))
+ .thenReturn(Mono.error(exceptionFromCbsClient))
+ val testVirtualDuration = Duration.ofMinutes(2)
+
+ it("should retry as long as possible until failing") {
+ StepVerifier
+ .withVirtualTime { configurationUpdates.last() }
+ .expectSubscription()
+ .expectNoEvent(testVirtualDuration)
+ .thenCancel()
+ .verifyThenAssertThat()
+ .allOperatorErrorsAre(exceptionFromCbsClient)
+ }
+
+ it("should notify configuration state listener about each retry") {
+ val requestsAmount = CbsClientMockSupplier.requestsAmount.get()
+ assertThat(requestsAmount).describedAs("CBS client requests amount").isGreaterThan(0)
+ verify(configStateListenerMock, times(requestsAmount)).retrying()
+ }
+ }
+
on("configuration changes in Config Binding Service") {
- whenever(cbsClientMock.get(any()))
+ whenever(CbsClientMockSupplier.cbsClientMock.get(any()))
.thenReturn(
Mono.just(configurationJsonWithIntervalChanged),
Mono.just(configurationJsonWithIntervalChangedAgain),
@@ -62,10 +130,7 @@ internal object ConfigurationModuleIT : Spek({
it("should wait $firstRequestDelayFromFile s as provided in configuration file and later" +
" fetch configurations in intervals specified within them") {
StepVerifier
- .withVirtualTime {
- sut.hvVesConfigurationUpdates(arguments, sampleMdc)
- .take(3)
- }
+ .withVirtualTime { configurationUpdates.take(3) }
.expectSubscription()
.expectNoEvent(firstRequestDelayFromFile)
.expectNext(configurationWithIntervalChanged)
@@ -80,26 +145,34 @@ internal object ConfigurationModuleIT : Spek({
}
})
+private data class MyCustomTestCbsClientException(val msg: String) : Exception(msg)
+
+private val MAX_BACKOFF_INTERVAL = Duration.ofMinutes(5)
+
+fun StepVerifier.Assertions.allOperatorErrorsAre(ex: Throwable) = hasOperatorErrorsMatching {
+ it.all { tuple -> tuple.t1.get() === ex }
+}
+
+private fun arguments(configurationPath: String) = arrayOf(
+ "--configuration-file",
+ configurationPath,
+ "--health-check-api-port",
+ "6062")
+
private val firstRequestDelayFromFile = Duration.ofSeconds(3)
private val firstRequestDelayFromCBS = Duration.ofSeconds(999)
private val requestIntervalFromCBS = Duration.ofSeconds(10)
private val anotherRequestIntervalFromCBS = Duration.ofSeconds(20)
-private val sampleMdc = { mapOf("k" to "v") }
-private val emptyRouting = listOf<Route>()
+private val configurationJsonWithIntervalChanged =
+ hvVesConfigurationJson(requestInterval = Some(requestIntervalFromCBS))
-private val configurationJsonWithIntervalChanged = JsonParser().parse("""{
- "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds}
-}""").asJsonObject
+private val configurationJsonWithIntervalChangedAgain =
+ hvVesConfigurationJson(requestInterval = Some(anotherRequestIntervalFromCBS),
+ firstRequestDelay = Some(firstRequestDelayFromCBS))
-private val configurationJsonWithIntervalChangedAgain = JsonParser().parse("""{
- "cbs.firstRequestDelaySec": ${firstRequestDelayFromCBS.seconds},
- "cbs.requestIntervalSec": ${anotherRequestIntervalFromCBS.seconds}
-}""").asJsonObject
-
-private val configurationJsonWithIntervalRestored = JsonParser().parse("""{
- "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds}
-}""").asJsonObject
+private val configurationJsonWithIntervalRestored =
+ hvVesConfigurationJson(requestInterval = Some(requestIntervalFromCBS))
private val configurationWithIntervalChanged =
hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS)
@@ -110,11 +183,4 @@ private val configurationWithIntervalChangedAgain =
private val configurationWithIntervalRestored =
hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS)
-private fun hvVesConfiguration(firstRequestDelay: Duration, requestInterval: Duration): HvVesConfiguration {
- return HvVesConfiguration(
- ServerConfiguration(6061, Duration.ofSeconds(60)),
- CbsConfiguration(firstRequestDelay, requestInterval),
- SecurityConfiguration(Option.empty()),
- CollectorConfiguration(emptyRouting, 1024 * 1024),
- LogLevel.DEBUG)
-} \ No newline at end of file
+
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/cbs_stub.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/cbs_stub.kt
new file mode 100644
index 00000000..2491264e
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/cbs_stub.kt
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.api
+
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.reset
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.function.Supplier
+
+
+internal object CbsClientMockSupplier : Supplier<CbsClient> {
+
+ private val logger = Logger(CbsClientMockSupplier::class)
+ private val cbsClientSupplierException = Exception("Test was configured to fail at client creation.")
+
+ private var shouldEmitError = false
+ val requestsAmount = AtomicInteger(0)
+ val cbsClientMock: CbsClient = mock()
+
+ override fun get(): CbsClient = requestsAmount.incrementAndGet().let {
+ if (shouldEmitError) {
+ throw cbsClientSupplierException
+ } else {
+ cbsClientMock
+ }
+ }
+
+ fun setCbsClientCreationSuccessful(creationSuccessful: Boolean) {
+ logger.trace { "Setting CBS creation success result to : $creationSuccessful" }
+ shouldEmitError = !creationSuccessful
+ }
+
+ fun throwedException(): Throwable = cbsClientSupplierException
+
+ fun reset() {
+ reset(cbsClientMock)
+ setCbsClientCreationSuccessful(true)
+ this.requestsAmount.set(0)
+ }
+}
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/test_configurations.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/test_configurations.kt
new file mode 100644
index 00000000..8472f3c9
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/test_configurations.kt
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.api
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.getOrElse
+import com.google.gson.JsonParser
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import java.time.Duration
+
+
+internal fun hvVesConfigurationJson(listenPort: Option<Int> = None,
+ idleTimeoutSec: Option<Int> = None,
+ firstRequestDelay: Option<Duration> = None,
+ requestInterval: Option<Duration> = None,
+ logLevel: Option<String> = None,
+ sslDisable: Option<Boolean> = None,
+ keyStoreFilePath: Option<String> = None,
+ keyStorePasswordFilePath: Option<String> = None,
+ trustStoreFilePath: Option<String> = None,
+ trustStorePasswordFilePath: Option<String> = None) = JsonParser().parse(
+ """{
+ ${addKeyIfPresent("logLevel", logLevel)}
+ ${addKeyIfPresent("server.listenPort", listenPort)}
+ ${addKeyIfPresent("server.idleTimeoutSec", idleTimeoutSec)}
+ ${addKeyIfPresent("cbs.firstRequestDelaySec", firstRequestDelay.map { it.seconds })}
+ ${addKeyIfPresent("cbs.requestIntervalSec", requestInterval.map { it.seconds })}
+ ${addKeyIfPresent("security.sslDisable", sslDisable)}
+ ${addKeyIfPresent("security.keys.keyStoreFile", keyStoreFilePath)}
+ ${addKeyIfPresent("security.keys.keyStorePasswordFile", keyStorePasswordFilePath)}
+ ${addKeyIfPresent("security.keys.trustStoreFile", trustStoreFilePath)}
+ ${addKeyIfPresent("security.keys.trustStorePasswordFile", trustStorePasswordFilePath)}
+""".trim().removeSuffix(",") + "}"
+).asJsonObject
+
+private fun <T> addKeyIfPresent(configurationKey: String, option: Option<T>) = option
+ .map { "$configurationKey: $it," }
+ .getOrElse { "" }
+
+
+private val emptyRouting = listOf<Route>()
+
+internal fun hvVesConfiguration(firstRequestDelay: Duration, requestInterval: Duration) =
+ HvVesConfiguration(
+ ServerConfiguration(6061, Duration.ofSeconds(60)),
+ CbsConfiguration(firstRequestDelay, requestInterval),
+ SecurityConfiguration(Option.empty()),
+ CollectorConfiguration(emptyRouting, 1024 * 1024),
+ LogLevel.DEBUG)
+
+
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapterTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapterTest.kt
new file mode 100644
index 00000000..1f6a2538
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapterTest.kt
@@ -0,0 +1,127 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import com.google.gson.JsonParser
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.times
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import reactor.core.publisher.Mono
+import reactor.test.StepVerifier
+import java.time.Duration
+
+internal object CbsClientAdapterTest : Spek({
+
+ describe("Config Binding Service Client Adapter") {
+
+ val cbsClientMock: CbsClient = mock()
+ val configStateListener: ConfigurationStateListener = mock()
+
+ given("successful client creation") {
+ val cbsClientMono = Mono.just(cbsClientMock)
+ val cut = CbsClientAdapter(cbsClientMono, firstRequestDelay, configStateListener, mdc, retry())
+
+ on("configurations stream in CBS") {
+ val firstConfigurationContent = "first"
+ val secondConfigurationContent = "second"
+ whenever(cbsClientMock.get(any())).thenReturn(
+ configurationMono(firstConfigurationContent),
+ configurationMono(secondConfigurationContent)
+ )
+
+ it("should return flux of fetched configurations") {
+ StepVerifier
+ .withVirtualTime {
+ cut.configurationUpdates().take(2)
+ }
+ .expectSubscription()
+ .expectNoEvent(firstRequestDelay)
+ .expectNext(configuration(firstConfigurationContent))
+ .expectNext(configuration(secondConfigurationContent))
+ .verifyComplete()
+ }
+ }
+
+
+ on("exception from CBS client on configuration fetch") {
+
+ whenever(cbsClientMock.get(any())).thenReturn(
+ Mono.error { sampleException }
+ )
+
+ it("should return error flux") {
+ StepVerifier.create(cut.configurationUpdates())
+ .expectErrorMatches { it === sampleException }
+ .verify()
+ }
+ }
+ }
+
+ given("repeated failure during client creation") {
+ val failedCreationsAmount = 3
+ var currentFailuresCount = 0
+ val cbsClientMono = Mono.fromCallable {
+ currentFailuresCount++
+ if (currentFailuresCount <= failedCreationsAmount) {
+ throw sampleException
+ } else {
+ cbsClientMock
+ }
+ }
+
+ val cut = CbsClientAdapter(cbsClientMono, firstRequestDelay, configStateListener, mdc,
+ retry(failedCreationsAmount + 1L))
+
+ on("CBS client creation") {
+ whenever(cbsClientMock.get(any())).thenReturn(configurationMono())
+
+ it("it should emit configuration after failures") {
+ StepVerifier
+ .withVirtualTime { cut.configurationUpdates().take(1) }
+ .expectSubscription()
+ .expectNoEvent(firstRequestDelay)
+ .expectNext(configuration())
+ .verifyComplete()
+ }
+
+ it("should call state listener when retrying") {
+ verify(configStateListener, times(failedCreationsAmount)).retrying()
+ }
+ }
+ }
+ }
+})
+
+private val firstRequestDelay = Duration.ofSeconds(10)
+private val sampleException = Exception("Best regards from CBS")
+
+private fun configuration(content: String = "whatever") =
+ JsonParser().parse("""{ "content": ${content} }""").asJsonObject
+
+private fun configurationMono(content: String = "whatever") = Mono.just(configuration(content))
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
index 31415454..0954b76e 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
@@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.Some
import com.google.gson.JsonParser
-import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.times
import com.nhaarman.mockitokotlin2.verify
@@ -49,11 +48,11 @@ internal object CbsConfigurationProviderTest : Spek({
describe("Configuration provider") {
- val cbsClientAdapter = mock<CbsClientAdapter>()
- val configStateListener = mock<ConfigurationStateListener>()
+ val cbsClientAdapter: CbsClientAdapter = mock()
+ val configStateListener: ConfigurationStateListener = mock()
given("configuration is never in cbs") {
- val cbsClientMock = mock<CbsClient>()
+ val cbsClientMock: CbsClient = mock()
val configProvider = constructConfigurationProvider(
constructCbsClientAdapter(cbsClientMock, configStateListener),
configStateListener
@@ -73,7 +72,7 @@ internal object CbsConfigurationProviderTest : Spek({
val configProvider = constructConfigurationProvider(cbsClientAdapter, configStateListener)
on("new configuration") {
- whenever(cbsClientAdapter.configurationUpdates(any()))
+ whenever(cbsClientAdapter.configurationUpdates())
.thenReturn(Flux.just(validConfiguration))
it("should use received configuration") {
@@ -110,7 +109,7 @@ internal object CbsConfigurationProviderTest : Spek({
)
on("new configuration") {
- whenever(cbsClientAdapter.configurationUpdates(any()))
+ whenever(cbsClientAdapter.configurationUpdates())
.thenReturn(Flux.just(invalidConfiguration))
it("should interrupt the flux") {
@@ -193,21 +192,15 @@ private val invalidConfiguration = JsonParser().parse("""
private val firstRequestDelay = Duration.ofMillis(1)
private val configParser = JsonConfigurationParser()
-private fun retry(iterationCount: Long = 1) = Retry
- .onlyIf<Any> { it.iteration() <= iterationCount }
- .fixedBackoff(Duration.ofNanos(1))
-
private fun constructCbsClientAdapter(cbsClientMock: CbsClient, configStateListener: ConfigurationStateListener) =
- CbsClientAdapter(Mono.just(cbsClientMock), configStateListener, firstRequestDelay, retry())
+ CbsClientAdapter(Mono.just(cbsClientMock), firstRequestDelay, configStateListener, mdc, retry())
private fun constructConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
configurationStateListener: ConfigurationStateListener,
- iterationCount: Long = 1
-): CbsConfigurationProvider =
- CbsConfigurationProvider(
- cbsClientAdapter,
- configParser,
- configurationStateListener,
- { mapOf("k" to "v") },
- retry(iterationCount)
- )
+ iterationCount: Long = 1) = CbsConfigurationProvider(
+ cbsClientAdapter,
+ configParser,
+ configurationStateListener,
+ mdc,
+ retry(iterationCount)
+)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt
index f07af079..d2b56b66 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt
@@ -24,7 +24,9 @@ import com.nhaarman.mockitokotlin2.whenever
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import reactor.retry.Retry
import java.nio.file.Paths
+import java.time.Duration
private fun resourcePathAsString(resource: String) =
Paths.get(ConfigurationValidatorTest::class.java.getResource(resource).toURI()).toString()
@@ -51,4 +53,11 @@ private val sampleSink = mock<KafkaSink>().also {
}
internal val sampleStreamsDefinition = listOf(sampleSink)
-internal val sampleRouting = listOf(Route(sampleSink.name(), sampleSink)) \ No newline at end of file
+internal val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
+
+internal val mdc = { mapOf("mdc_key" to "mdc_value") }
+
+internal fun retry(iterationCount: Long = 1) = Retry
+ .onlyIf<Any> { it.iteration() <= iterationCount }
+ .fixedBackoff(Duration.ofNanos(1))
+
diff --git a/sources/hv-collector-kafka-consumer/Dockerfile b/sources/hv-collector-kafka-consumer/Dockerfile
new file mode 100644
index 00000000..aed9680c
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/Dockerfile
@@ -0,0 +1,18 @@
+FROM docker.io/openjdk:11-jre-slim
+
+LABEL copyright="Copyright (C) 2019 NOKIA"
+LABEL license.name="The Apache Software License, Version 2.0"
+LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
+LABEL maintainer="Nokia Wroclaw ONAP Team"
+
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends curl \
+ && apt-get clean
+
+WORKDIR /opt/hv-ves-kafka-consumer
+
+ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.kafkaconsumer.MainKt"]
+
+COPY target/libs/external/* ./
+COPY target/libs/internal/* ./
+COPY target/hv-collector-kafka-consumer-*.jar ./
diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml
new file mode 100644
index 00000000..45a32729
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
+ <artifactId>hv-collector-sources</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+
+ <artifactId>hv-collector-kafka-consumer</artifactId>
+
+ <description>VES HighVolume Collector :: Kafka consumer</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>docker</id>
+ <activation>
+ <property>
+ <name>!skipDocker</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-commandline</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-test-utils</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib-jdk8</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
new file mode 100644
index 00000000..fa15587c
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
@@ -0,0 +1,22 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer
+
+fun main(args: Array<String>) = println("Guten tag")
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt
new file mode 100644
index 00000000..b7ea126f
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import kotlin.test.assertTrue
+
+object SampleTest : Spek({
+ describe("sample test") {
+ assertTrue(true)
+ }
+})
diff --git a/sources/pom.xml b/sources/pom.xml
index 81bf3017..c7ba4886 100644
--- a/sources/pom.xml
+++ b/sources/pom.xml
@@ -142,6 +142,7 @@
<module>hv-collector-dcae-app-simulator</module>
<module>hv-collector-domain</module>
<module>hv-collector-health-check</module>
+ <module>hv-collector-kafka-consumer</module>
<module>hv-collector-main</module>
<module>hv-collector-server</module>
<module>hv-collector-ssl</module>