summaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt3
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt3
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt1
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt3
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt1
-rw-r--r--sources/hv-collector-configuration/src/test/resources/sampleConfig.json1
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt65
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt1
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt1
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt22
-rw-r--r--sources/hv-collector-main/src/main/docker/base.json1
12 files changed, 14 insertions, 93 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
index 566f2c08..3375821e 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
@@ -49,6 +49,5 @@ data class CbsConfiguration(
data class CollectorConfiguration(
val maxRequestSizeBytes: Int,
val kafkaServers: String,
- val routing: Routing,
- val dummyMode: Boolean = false
+ val routing: Routing
)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index 90be3dbd..330ec7c1 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -97,8 +97,7 @@ internal class ConfigurationValidator {
CollectorConfiguration(
it.maxRequestSizeBytes.bind(),
toKafkaServersString(it.kafkaServers.bind()),
- it.routing.bind(),
- it.dummyMode.bind()
+ it.routing.bind()
)
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
index 3e6df3e0..3cedd6b5 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
@@ -52,7 +52,6 @@ internal data class PartialCbsConfig(
internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
internal data class PartialCollectorConfig(
- val dummyMode: Option<Boolean> = None,
val maxRequestSizeBytes: Option<Int> = None,
val kafkaServers: Option<List<InetSocketAddress>> = None,
val routing: Option<Routing> = None
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
index beb5df61..696f42a2 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
@@ -64,7 +64,6 @@ internal object ConfigurationValidatorTest : Spek({
Some(mock())
)),
Some(PartialCollectorConfig(
- Some(true),
Some(4),
Some(emptyList()),
Some(routing { }.build())
@@ -105,7 +104,6 @@ internal object ConfigurationValidatorTest : Spek({
securityKeys
)),
Some(PartialCollectorConfig(
- Some(true),
Some(4),
Some(emptyList()),
Some(routing)
@@ -156,7 +154,6 @@ internal object ConfigurationValidatorTest : Spek({
securityKeys
)),
Some(PartialCollectorConfig(
- Some(true),
Some(4),
Some(emptyList()),
Some(routing)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
index 8267e304..bbf259c7 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
@@ -145,7 +145,6 @@ internal object FileConfigurationReaderTest : Spek({
assertThat(config.collector.nonEmpty()).isTrue()
val collector = config.collector.orNull() as PartialCollectorConfig
collector.run {
- assertThat(dummyMode).isEqualTo(Some(false))
assertThat(maxRequestSizeBytes).isEqualTo(Some(512000))
assertThat(kafkaServers.nonEmpty()).isTrue()
assertThat(routing.nonEmpty()).isTrue()
diff --git a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
index b49085e8..5ae9fc02 100644
--- a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
+++ b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
@@ -20,7 +20,6 @@
}
},
"collector": {
- "dummyMode": false,
"maxRequestSizeBytes": 512000,
"kafkaServers": [
"192.168.255.1:5005",
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index c362020e..10fe0a51 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -33,10 +33,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti
*/
object AdapterFactory {
fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider =
- if (config.dummyMode)
- LoggingSinkProvider()
- else
- KafkaSinkProvider(config)
+ KafkaSinkProvider(config)
fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
ConfigurationProviderImpl(
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
deleted file mode 100644
index 3a9467f7..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ConsumedMessage
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Flux
-import java.util.concurrent.atomic.AtomicLong
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-internal class LoggingSinkProvider : SinkProvider {
-
- override fun invoke(ctx: ClientContext): Sink {
- return object : Sink {
- private val totalMessages = AtomicLong()
- private val totalBytes = AtomicLong()
-
- override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
- messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage)
-
- private fun logMessage(msg: RoutedMessage) {
- val msgs = totalMessages.addAndGet(1)
- val bytes = totalBytes.addAndGet(msg.message.wtpFrame.payloadSize.toLong())
- val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
- if (msgs % INFO_LOGGING_FREQ == 0L)
- logger.info(ctx, logMessageSupplier)
- else
- logger.trace(ctx, logMessageSupplier)
- }
-
- }
- }
-
- companion object {
- const val INFO_LOGGING_FREQ = 100_000
- private val logger = Logger(LoggingSinkProvider::class)
- }
-}
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
index 1e3f2e7a..8ea53fbe 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -43,7 +43,6 @@ internal object KafkaSinkProviderTest : Spek({
describe("non functional requirements") {
given("sample configuration") {
val config = CollectorConfiguration(
- dummyMode = false,
maxRequestSizeBytes = 1024 * 1024,
kafkaServers = "localhost:9090",
routing = routing { }.build())
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index eb3ba264..d965b787 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -27,7 +27,6 @@ import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.model.ClientContext
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 6599d402..6425601e 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -36,11 +36,11 @@ const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"
val configWithBasicRouting = sequenceOf(
- ImmutableKafkaSink.builder()
- .name(PERF3GPP.domainName)
- .topicName(PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build()
+ ImmutableKafkaSink.builder()
+ .name(PERF3GPP.domainName)
+ .topicName(PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build()
)
val configWithTwoDomainsToOneTopicRouting = sequenceOf(
@@ -62,12 +62,12 @@ val configWithTwoDomainsToOneTopicRouting = sequenceOf(
)
val configWithDifferentRouting = sequenceOf(
- ImmutableKafkaSink.builder()
- .name(PERF3GPP.domainName)
- .topicName(ALTERNATE_PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build()
- )
+ ImmutableKafkaSink.builder()
+ .name(PERF3GPP.domainName)
+ .topicName(ALTERNATE_PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build()
+)
val configWithEmptyRouting = emptySequence<KafkaSink>()
diff --git a/sources/hv-collector-main/src/main/docker/base.json b/sources/hv-collector-main/src/main/docker/base.json
index 7f88cb6e..67576c80 100644
--- a/sources/hv-collector-main/src/main/docker/base.json
+++ b/sources/hv-collector-main/src/main/docker/base.json
@@ -12,7 +12,6 @@
"security": {
},
"collector": {
- "dummyMode": false,
"maxRequestSizeBytes": 1048576,
"kafkaServers": [
"message-router-kafka:9092"