summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt')
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt157
1 files changed, 157 insertions, 0 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
new file mode 100644
index 00000000..c6364f74
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -0,0 +1,157 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+
+import reactor.core.publisher.Mono
+import reactor.retry.Retry
+import reactor.test.StepVerifier
+import java.time.Duration
+import java.util.*
+import kotlin.test.assertEquals
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object ConsulConfigurationProviderTest : Spek({
+
+ describe("Consul configuration provider") {
+
+ val httpAdapterMock: HttpAdapter = mock()
+ val healthStateProvider = HealthState.INSTANCE
+
+ given("valid resource url") {
+ val validUrl = "http://valid-url/"
+ val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
+
+ on("call to consul") {
+ whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
+ .thenReturn(Mono.just(constructConsulResponse()))
+
+ it("should use received configuration") {
+
+ StepVerifier.create(consulConfigProvider().take(1))
+ .consumeNextWith {
+
+ assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
+
+ val route1 = it.routing.routes[0]
+ assertThat(FAULT.domainName)
+ .describedAs("routed domain 1")
+ .isEqualTo(route1.domain)
+ assertThat("test-topic-1")
+ .describedAs("target topic 1")
+ .isEqualTo(route1.targetTopic)
+
+ val route2 = it.routing.routes[1]
+ assertThat(HEARTBEAT.domainName)
+ .describedAs("routed domain 2")
+ .isEqualTo(route2.domain)
+ assertThat("test-topic-2")
+ .describedAs("target topic 2")
+ .isEqualTo(route2.targetTopic)
+
+ }.verifyComplete()
+ }
+ }
+
+ }
+ given("invalid resource url") {
+ val invalidUrl = "http://invalid-url/"
+
+ val iterationCount = 3L
+ val consulConfigProvider = constructConsulConfigProvider(
+ invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
+ )
+
+ on("call to consul") {
+ whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
+ .thenReturn(Mono.error(RuntimeException("Test exception")))
+
+ it("should interrupt the flux") {
+
+ StepVerifier.create(consulConfigProvider())
+ .verifyErrorMessage("Test exception")
+ }
+
+ it("should update the health state") {
+ StepVerifier.create(healthStateProvider().take(iterationCount))
+ .expectNextCount(iterationCount - 1)
+ .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ .verifyComplete()
+ }
+ }
+ }
+ }
+
+})
+
+private fun constructConsulConfigProvider(url: String,
+ httpAdapter: HttpAdapter,
+ healthState: HealthState,
+ iterationCount: Long = 1
+): ConsulConfigurationProvider {
+
+ val firstRequestDelay = Duration.ofMillis(1)
+ val requestInterval = Duration.ofMillis(1)
+ val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+
+ return ConsulConfigurationProvider(
+ httpAdapter,
+ url,
+ firstRequestDelay,
+ requestInterval,
+ healthState,
+ retry
+ )
+}
+
+
+const val kafkaAddress = "message-router-kafka"
+
+fun constructConsulResponse(): String =
+ """{
+ "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
+ "collector.routing": [
+ {
+ "fromDomain": "fault",
+ "toTopic": "test-topic-1"
+ },
+ {
+ "fromDomain": "heartbeat",
+ "toTopic": "test-topic-2"
+ }
+ ]
+ }"""