aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--build/hv-collector-coverage/pom.xml2
-rw-r--r--development/consul.d/cbs.json10
-rw-r--r--development/docker-compose.yml73
-rw-r--r--pom.xml7
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt16
-rw-r--r--sources/hv-collector-core/pom.xml19
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt10
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt13
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt118
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt145
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt5
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt (renamed from sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt)112
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt65
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt66
-rw-r--r--sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt3
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt26
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt27
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt4
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt16
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt2
-rw-r--r--sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityUtilsTest.kt65
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt16
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt19
24 files changed, 519 insertions, 322 deletions
diff --git a/build/hv-collector-coverage/pom.xml b/build/hv-collector-coverage/pom.xml
index 08fc5a22..31ff78fc 100644
--- a/build/hv-collector-coverage/pom.xml
+++ b/build/hv-collector-coverage/pom.xml
@@ -3,7 +3,7 @@
~ ============LICENSE_START=======================================================
~ dcaegen2-collectors-veshv
~ ================================================================================
- ~ Copyright (C) 2018 NOKIA
+ ~ Copyright (C) 2018-2019 NOKIA
~ ================================================================================
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
diff --git a/development/consul.d/cbs.json b/development/consul.d/cbs.json
new file mode 100644
index 00000000..0761c7e5
--- /dev/null
+++ b/development/consul.d/cbs.json
@@ -0,0 +1,10 @@
+{
+ "service": {
+ "name": "cbs",
+ "tags": [
+ "cbs"
+ ],
+ "port": 10000,
+ "address": "config-binding-service"
+ }
+}
diff --git a/development/docker-compose.yml b/development/docker-compose.yml
index c93100ef..708c8f3e 100644
--- a/development/docker-compose.yml
+++ b/development/docker-compose.yml
@@ -8,13 +8,13 @@ services:
message-router-zookeeper:
image: wurstmeister/zookeeper
ports:
- - "2181:2181"
+ - "2181:2181"
message-router-kafka:
-# image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1
+ # image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1
image: wurstmeister/kafka
ports:
- - "9092:9092"
+ - "9092:9092"
environment:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181"
@@ -23,9 +23,9 @@ services:
KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT"
volumes:
- - /var/run/docker.sock:/var/run/docker.sock
+ - /var/run/docker.sock:/var/run/docker.sock
depends_on:
- - message-router-zookeeper
+ - message-router-zookeeper
#
@@ -35,23 +35,34 @@ services:
consul-server:
image: docker.io/consul:1.0.6
ports:
- - "8500:8500"
- command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui"]
+ - "8500:8500"
+ command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui", "-config-dir=/consul/consul.d"]
+ volumes:
+ - ./consul.d/:/consul/consul.d
consul-config:
image: consul
- depends_on:
- - consul-server
restart: on-failure
- command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{
+ command: ["kv", "put", "-http-addr=http://consul-server:8500", "dcae-hv-ves-collector", '{
"collector.routing": [
{
"fromDomain": "perf3gpp",
"toTopic": "HV_VES_PERF3GPP"
}
]
- }']
+ }'
+ ]
+ depends_on:
+ - consul-server
+ config-binding-service:
+ image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.platform.configbinding.app-app:2.2.4
+ ports:
+ - "10000:10000"
+ environment:
+ CONSUL_HOST: "consul-server"
+ depends_on:
+ - consul-config
#
# DCAE HV VES Collector
@@ -60,30 +71,32 @@ services:
ves-hv-collector:
image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest
ports:
- - "6060:6060"
- - "6061:6061/tcp"
+ - "6060:6060"
+ - "6061:6061/tcp"
command: ["--listen-port", "6061",
"--health-check-api-port", "6060",
- "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true",
"--kafka-bootstrap-servers", "message-router-kafka:9092",
"--key-store-password", "onaponap",
"--trust-store-password", "onaponap",
- "--first-request-delay", "2",
+ "--first-request-delay", "5",
"--log-level", "DEBUG"]
environment:
JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml"
+ CONSUL_HOST: "consul-server"
+ CONFIG_BINDING_SERVICE: "cbs"
+ HOSTNAME: "dcae-hv-ves-collector"
healthcheck:
test: ./healthcheck.sh || exit 1
interval: 10s
timeout: 3s
retries: 3
- start_period: 20s
+ start_period: 15s
depends_on:
- - message-router-kafka
- - consul-config
+ - message-router-kafka
+ - config-binding-service
volumes:
- - ./ssl/:/etc/ves-hv/
- - ./logs:/var/log/ONAP/dcae-hv-ves-collector
+ - ./ssl/:/etc/ves-hv/
+ - ./logs:/var/log/ONAP/dcae-hv-ves-collector
#
@@ -93,8 +106,8 @@ services:
xnf-simulator:
image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator
ports:
- - "6062:6062/tcp"
- - "6063:6063"
+ - "6062:6062/tcp"
+ - "6063:6063"
command: ["--listen-port", "6062",
"--health-check-api-port", "6063",
"--ves-host", "ves-hv-collector",
@@ -109,19 +122,19 @@ services:
retries: 3
start_period: 10s
depends_on:
- - ves-hv-collector
+ - ves-hv-collector
volumes:
- ./ssl/:/etc/ves-hv/
dcae-app-simulator:
image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator
ports:
- - "6064:6064/tcp"
+ - "6064:6064/tcp"
command: ["--listen-port", "6064",
"--kafka-bootstrap-servers", "message-router-kafka:9092",
"--kafka-topics", "HV_VES_PERF3GPP"]
depends_on:
- - message-router-kafka
+ - message-router-kafka
#
# Monitoring
@@ -136,16 +149,16 @@ services:
grafana:
image: grafana/grafana
ports:
- - "3000:3000"
+ - "3000:3000"
environment:
GF_AUTH_DISABLE_LOGIN_FORM: "true"
GF_AUTH_DISABLE_SIGNOUT_MENU: "true"
GF_AUTH_ANONYMOUS_ENABLED: "true"
GF_AUTH_ANONYMOUS_ORG_ROLE: "Admin"
volumes:
- - ./grafana/datasources:/etc/grafana/provisioning/datasources
- - ./grafana/dashboards-providers:/etc/grafana/provisioning/dashboards
- # defined in ./grafana/dashboards-providers/dasboard-providers.yaml
- - ./grafana/dashboards:/var/lib/grafana/dashboards/hv-ves
+ - ./grafana/datasources:/etc/grafana/provisioning/datasources
+ - ./grafana/dashboards-providers:/etc/grafana/provisioning/dashboards
+ # defined in ./grafana/dashboards-providers/dasboard-providers.yaml
+ - ./grafana/dashboards:/var/lib/grafana/dashboards/hv-ves
diff --git a/pom.xml b/pom.xml
index 7d11f03f..93226080 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@
<build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
<jacoco.version>0.8.2</jacoco.version>
<detekt.version>1.0.0-RC11</detekt.version>
- <sdk.version>1.1.3</sdk.version>
+ <sdk.version>1.1.4-SNAPSHOT</sdk.version>
<!-- Protocol buffers -->
<protobuf.version>3.6.1</protobuf.version>
@@ -585,6 +585,11 @@
<artifactId>ssl</artifactId>
<version>${sdk.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>cbs-client</artifactId>
+ <version>${sdk.version}</version>
+ </dependency>
<!-- Test dependencies -->
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
index 0c3f60bb..31849215 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
@@ -38,26 +38,18 @@ enum class CommandLineOption(val option: Option, val required: Boolean = false)
.build(),
required = true
),
- CONSUL_CONFIG_URL(
- Option.builder("c")
- .longOpt("config-url")
- .hasArg()
- .desc("URL of ves configuration on consul")
- .build(),
- required = true
- ),
- CONSUL_FIRST_REQUEST_DELAY(
+ CONFIGURATION_FIRST_REQUEST_DELAY(
Option.builder("d")
.longOpt("first-request-delay")
.hasArg()
- .desc("Delay of first request to consul in seconds")
+ .desc("Delay of first request for configuration in seconds")
.build()
),
- CONSUL_REQUEST_INTERVAL(
+ CONFIGURATION_REQUEST_INTERVAL(
Option.builder("I")
.longOpt("request-interval")
.hasArg()
- .desc("Interval of consul configuration requests in seconds")
+ .desc("Interval of configuration requests in seconds")
.build()
),
VES_HV_PORT(
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml
index 29e1ea94..c21f2ed2 100644
--- a/sources/hv-collector-core/pom.xml
+++ b/sources/hv-collector-core/pom.xml
@@ -3,7 +3,7 @@
~ ============LICENSE_START=======================================================
~ dcaegen2-collectors-veshv
~ ================================================================================
- ~ Copyright (C) 2018 NOKIA
+ ~ Copyright (C) 2018-2019 NOKIA
~ ================================================================================
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -19,8 +19,8 @@
~ ============LICENSE_END=========================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<licenses>
@@ -85,6 +85,10 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>cbs-client</artifactId>
+ </dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
@@ -114,15 +118,6 @@
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
- <dependency>
- <groupId>javax.json</groupId>
- <artifactId>javax.json-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish</groupId>
- <artifactId>javax.json</artifactId>
- <scope>runtime</scope>
- </dependency>
</dependencies>
</project>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 535d1baa..633095dc 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@ import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.concurrent.atomic.AtomicReference
@@ -53,18 +54,19 @@ class CollectorFactory(val configuration: ConfigurationProvider,
val config: AtomicReference<CollectorConfiguration> = AtomicReference()
configuration()
.doOnNext {
- logger.info { "Using updated configuration for new connections" }
+ logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
healthState.changeState(HealthDescription.HEALTHY)
}
.doOnError {
- logger.error { "Failed to acquire configuration from consul" }
+ logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
+ logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
}
.subscribe(config::set)
return object : CollectorProvider {
override fun invoke(ctx: ClientContext): Option<Collector> =
- config.getOption().map { createVesHvCollector(it, ctx) }
+ config.getOption().map { createVesHvCollector(it, ctx) }
override fun close() = sinkProvider.close()
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 75b6f0a6..312d6d7b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,8 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
-import reactor.netty.http.client.HttpClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -38,8 +39,8 @@ object AdapterFactory {
else
KafkaSinkProvider(kafkaConfig)
- fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
- ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
-
- private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
+ fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+ ConfigurationProviderImpl(
+ CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
+ configurationProviderParams)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
new file mode 100644
index 00000000..736f474a
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import com.google.gson.JsonObject
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+import java.time.Duration
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>,
+ private val firstRequestDelay: Duration,
+ private val requestInterval: Duration,
+ private val healthState: HealthState,
+ retrySpec: Retry<Any>
+
+) : ConfigurationProvider {
+ constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this(
+ cbsClientMono,
+ params.firstRequestDelay,
+ params.requestInterval,
+ HealthState.INSTANCE,
+ Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(params.requestInterval)
+ .jitter(Jitter.random())
+ )
+
+ private val retry = retrySpec.doOnRetry {
+ logger.withWarn(ServiceContext::mdc) {
+ log("Exception from configuration provider client, retrying subscription", it.exception())
+ }
+ healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+ }
+
+ override fun invoke(): Flux<CollectorConfiguration> =
+ cbsClientMono
+ .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
+ .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
+ .retryWhen(retry)
+ .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
+ .flatMapMany(::handleUpdates)
+
+ private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient
+ .updates(RequestDiagnosticContext.create(),
+ firstRequestDelay,
+ requestInterval)
+ .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
+ .map(::createCollectorConfiguration)
+ .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
+ .retryWhen(retry)
+
+
+ private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
+ try {
+ val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
+ CollectorConfiguration(
+ routing {
+ for (route in routingArray) {
+ val routeObj = route.asJsonObject
+ defineRoute {
+ fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
+ toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
+ withFixedPartitioning()
+ }
+ }
+ }.build()
+ )
+ } catch (e: NullPointerException) {
+ throw ParsingException("Failed to parse configuration", e)
+ }
+
+ private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString
+
+
+ companion object {
+ private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
+ private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
+ private const val TOPIC_CONFIGURATION_KEY = "toTopic"
+
+ private const val MAX_RETRIES = 5L
+ private val logger = Logger(ConfigurationProviderImpl::class)
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
deleted file mode 100644
index d58cc792..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
-import reactor.retry.Retry
-import java.io.StringReader
-import java.security.MessageDigest
-import java.time.Duration
-import java.util.*
-import java.util.concurrent.atomic.AtomicReference
-import javax.json.Json
-import javax.json.JsonObject
-
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal class ConsulConfigurationProvider(private val http: HttpAdapter,
- private val url: String,
- private val firstRequestDelay: Duration,
- private val requestInterval: Duration,
- private val healthState: HealthState,
- retrySpec: Retry<Any>
-
-) : ConfigurationProvider {
- private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf())
- private val retry = retrySpec.doOnRetry {
- logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) }
- healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
- }
-
- constructor(http: HttpAdapter,
- params: ConfigurationProviderParams) : this(
- http,
- params.configurationUrl,
- params.firstRequestDelay,
- params.requestInterval,
- HealthState.INSTANCE,
- Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
- .jitter(Jitter.random())
- )
-
- override fun invoke(): Flux<CollectorConfiguration> =
- Flux.interval(firstRequestDelay, requestInterval)
- .concatMap { askForConfig() }
- .flatMap(::filterDifferentValues)
- .map(::parseJsonResponse)
- .map(::createCollectorConfiguration)
- .retryWhen(retry)
-
- private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
- val invocationId = UUID.randomUUID()
- http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
- }
-
- private fun filterDifferentValues(configuration: BodyWithInvocationId) =
- configuration.body.let { configurationString ->
- configurationString.sha256().let { newHash ->
- if (newHash contentEquals lastConfigurationHash.get()) {
- logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
- "No change detected in consul configuration"
- }
- Mono.empty()
- } else {
- logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
- "Obtained new configuration from consul:\n$configurationString"
- }
- lastConfigurationHash.set(newHash)
- Mono.just(configurationString)
- }
- }
- }
-
- private fun parseJsonResponse(responseString: String): JsonObject =
- Json.createReader(StringReader(responseString)).readObject()
-
- private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
- try {
- val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY)
- CollectorConfiguration(
- routing {
- for (route in routingArray) {
- val routeObj = route.asJsonObject()
- defineRoute {
- fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY))
- toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY))
- withFixedPartitioning()
- }
- }
- }.build()
- )
- } catch (e: NullPointerException) {
- throw ParsingException("Failed to parse consul configuration", e)
- }
-
-
- companion object {
- private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
- private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
- private const val TOPIC_CONFIGURATION_KEY = "toTopic"
-
- private const val MAX_RETRIES = 5L
- private const val BACKOFF_INTERVAL_FACTOR = 30L
- private val logger = Logger(ConsulConfigurationProvider::class)
- private fun String.sha256() =
- MessageDigest
- .getInstance("SHA-256")
- .digest(toByteArray())
-
- }
-
- private data class BodyWithInvocationId(val body: String, val invocationId: UUID)
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
index 91f502e6..a1e5b8fd 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
@@ -78,4 +78,4 @@ internal fun populateClientContextFromInbound(clientContext: ClientContext, nett
withConnectionFrom(nettyInbound) { connection ->
clientContext.clientAddress = Try { connection.address().address }.toOption()
clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() }
- } \ No newline at end of file
+ }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
index 9de34498..ac7a9db0 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,5 @@ import java.time.Duration
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since July 2018
*/
-data class ConfigurationProviderParams(val configurationUrl: String,
- val firstRequestDelay: Duration,
+data class ConfigurationProviderParams(val firstRequestDelay: Duration,
val requestInterval: Duration)
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
index ccae3c99..21aaa129 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
+import com.google.gson.JsonParser
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
@@ -29,11 +30,12 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.mockito.Mockito
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Retry
@@ -44,24 +46,36 @@ import java.time.Duration
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since May 2018
*/
-internal object ConsulConfigurationProviderTest : Spek({
+internal object ConfigurationProviderImplTest : Spek({
- describe("Consul configuration provider") {
+ describe("Configuration provider") {
- val httpAdapterMock: HttpAdapter = mock()
+ val cbsClient: CbsClient = mock()
+ val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
val healthStateProvider = HealthState.INSTANCE
- given("valid resource url") {
- val validUrl = "http://valid-url/"
- val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
+ given("configuration is never in cbs") {
+ val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
- on("call to consul") {
- whenever(httpAdapterMock.get(eq(validUrl), any(), Mockito.anyMap()))
- .thenReturn(Mono.just(constructConsulResponse()))
+ on("waiting for configuration") {
+ val waitTime = Duration.ofMillis(100)
+ it("should not get it") {
+ StepVerifier.create(configProvider().take(1))
+ .expectNoEvent(waitTime)
+ }
+ }
+
+ }
+ given("valid configuration from cbs") {
+ val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
+
+ on("new configuration") {
+ whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+ .thenReturn(Flux.just(validConfiguration))
it("should use received configuration") {
- StepVerifier.create(consulConfigProvider().take(1))
+ StepVerifier.create(configProvider().take(1))
.consumeNextWith {
val route1 = it.routing.routes[0]
@@ -85,22 +99,19 @@ internal object ConsulConfigurationProviderTest : Spek({
}
}
- given("invalid resource url") {
- val invalidUrl = "http://invalid-url/"
-
+ given("invalid configuration from cbs") {
val iterationCount = 3L
- val consulConfigProvider = constructConsulConfigProvider(
- invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
+ val configProvider = constructConfigurationProvider(
+ cbsClientMock, healthStateProvider, iterationCount
)
- on("call to consul") {
- whenever(httpAdapterMock.get(eq(invalidUrl), any(), Mockito.anyMap()))
- .thenReturn(Mono.error(RuntimeException("Test exception")))
+ on("new configuration") {
+ whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+ .thenReturn(Flux.just(invalidConfiguration))
it("should interrupt the flux") {
-
- StepVerifier.create(consulConfigProvider())
- .verifyErrorMessage("Test exception")
+ StepVerifier.create(configProvider())
+ .verifyError()
}
it("should update the health state") {
@@ -115,28 +126,9 @@ internal object ConsulConfigurationProviderTest : Spek({
})
-private fun constructConsulConfigProvider(url: String,
- httpAdapter: HttpAdapter,
- healthState: HealthState,
- iterationCount: Long = 1
-): ConsulConfigurationProvider {
-
- val firstRequestDelay = Duration.ofMillis(1)
- val requestInterval = Duration.ofMillis(1)
- val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
- return ConsulConfigurationProvider(
- httpAdapter,
- url,
- firstRequestDelay,
- requestInterval,
- healthState,
- retry
- )
-}
-
-fun constructConsulResponse(): String =
- """{
+private val validConfiguration = JsonParser().parse("""
+{
"whatever": "garbage",
"collector.routing": [
{
@@ -148,4 +140,34 @@ fun constructConsulResponse(): String =
"toTopic": "test-topic-2"
}
]
- }"""
+}""").asJsonObject
+
+private val invalidConfiguration = JsonParser().parse("""
+{
+ "whatever": "garbage",
+ "collector.routing": [
+ {
+ "fromDomain": "garbage",
+ "meaningful": "garbage"
+ }
+ ]
+}""").asJsonObject
+
+private val firstRequestDelay = Duration.ofMillis(1)
+private val requestInterval = Duration.ofMillis(1)
+
+private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
+ healthState: HealthState,
+ iterationCount: Long = 1
+): ConfigurationProviderImpl {
+
+ val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+
+ return ConfigurationProviderImpl(
+ cbsClientMono,
+ firstRequestDelay,
+ requestInterval,
+ healthState,
+ retry
+ )
+}
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt
new file mode 100644
index 00000000..63caaf0a
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import com.google.protobuf.MessageLite
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
+
+
+class ProtobufSerializerTest : Spek({
+
+ describe("ProtobufSerializerTest") {
+ val serializer = ProtobufSerializer()
+
+ on("serialize") {
+ it("should return byte array from WTP Frame paylaod") {
+ val header = getDefaultInstance()
+ val payload = header.toByteArray()
+ val msg: MessageLite = mock()
+
+ serializer.serialize("", msg)
+
+ verify(msg).toByteArray()
+ }
+ }
+
+ on("configuring") {
+ it("should do nothing") {
+ // increase code coverage
+ serializer.configure(hashMapOf<String, String>(), false)
+ }
+ }
+
+ on("closing") {
+ it("should do nothing") {
+ // increase code coverage
+ serializer.close()
+ }
+ }
+ }
+
+
+}) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt
new file mode 100644
index 00000000..3a194b47
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
+
+
+class VesMessageSerializerTest : Spek({
+
+ describe("VesMessageSerializer") {
+ val serializer = VesMessageSerializer()
+
+ on("serialize") {
+ it("should return byte array from WTP Frame paylaod") {
+ val header = getDefaultInstance()
+ val payload = header.toByteArray()
+ val msg = VesMessage(header, WireFrameMessage(payload))
+
+ val serialized = serializer.serialize("", msg)
+
+ assertThat(serialized).isEqualTo(payload)
+ }
+ }
+
+ on("configuring") {
+ it("should do nothing") {
+ // increase code coverage
+ serializer.configure(hashMapOf<String, String>(), false)
+ }
+ }
+
+ on("closing") {
+ it("should do nothing") {
+ // increase code coverage
+ serializer.close()
+ }
+ }
+ }
+
+
+
+}) \ No newline at end of file
diff --git a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
index 32486009..fb5bb9a2 100644
--- a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
+++ b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,7 +29,6 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.netty.DisposableServer
import reactor.netty.http.server.HttpServer
import reactor.netty.http.server.HttpServerRequest
import reactor.netty.http.server.HttpServerResponse
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index 99ec5e1e..bb484cfe 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,9 +27,8 @@ import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_CONFIG_URL
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_REQUEST_INTERVAL
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FIRST_REQUEST_DELAY
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
@@ -64,9 +63,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
KAFKA_SERVERS,
HEALTH_CHECK_API_PORT,
LISTEN_PORT,
- CONSUL_CONFIG_URL,
- CONSUL_FIRST_REQUEST_DELAY,
- CONSUL_REQUEST_INTERVAL,
+ CONFIGURATION_FIRST_REQUEST_DELAY,
+ CONFIGURATION_REQUEST_INTERVAL,
SSL_DISABLE,
KEY_STORE_FILE,
KEY_STORE_PASSWORD,
@@ -117,17 +115,15 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
Option.monad().binding {
- val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
val firstRequestDelay = cmdLine.longValue(
- CONSUL_FIRST_REQUEST_DELAY,
- DefaultValues.CONSUL_FIRST_REQUEST_DELAY
+ CONFIGURATION_FIRST_REQUEST_DELAY,
+ DefaultValues.CONFIGURATION_FIRST_REQUEST_DELAY
)
val requestInterval = cmdLine.longValue(
- CONSUL_REQUEST_INTERVAL,
- DefaultValues.CONSUL_REQUEST_INTERVAL
+ CONFIGURATION_REQUEST_INTERVAL,
+ DefaultValues.CONFIGURATION_REQUEST_INTERVAL
)
ConfigurationProviderParams(
- configUrl,
Duration.ofSeconds(firstRequestDelay),
Duration.ofSeconds(requestInterval)
)
@@ -145,8 +141,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
internal object DefaultValues {
const val HEALTH_CHECK_API_PORT = 6060
- const val CONSUL_FIRST_REQUEST_DELAY = 10L
- const val CONSUL_REQUEST_INTERVAL = 5L
+ const val CONFIGURATION_FIRST_REQUEST_DELAY = 10L
+ const val CONFIGURATION_REQUEST_INTERVAL = 5L
const val IDLE_TIMEOUT_SEC = 60L
const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
val LOG_LEVEL = LogLevel.INFO.name
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index f00b9ee4..d21b490c 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.main
import arrow.effects.IO
import arrow.effects.fix
import arrow.effects.instances.io.monad.monad
-import arrow.effects.instances.io.monadError.monadError
import arrow.typeclasses.binding
import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
@@ -51,23 +50,25 @@ fun main(args: Array<String>) =
logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
ExitFailure(1)
},
- { logger.debug(ServiceContext::mdc) { "Finished" } }
+ { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } }
)
private fun startAndAwaitServers(config: ServerConfiguration) =
IO.monad().binding {
Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
logger.info { "Using configuration: $config" }
+
val healthCheckServerHandle = HealthCheckServer.start(config).bind()
- VesServer.start(config).bind().let { handle ->
- registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind()
- handle.await().bind()
- }
- }.fix()
+ val hvVesHandle = VesServer.start(config).bind()
-internal fun closeServers(vararg handles: ServerHandle, healthState: HealthState = HealthState.INSTANCE): IO<Unit> =
- IO.monadError().binding {
- healthState.changeState(HealthDescription.SHUTTING_DOWN)
- Closeable.closeAll(handles.asIterable()).bind()
- logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
+ registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle))
+ hvVesHandle.await().bind()
}.fix()
+
+internal fun closeServers(vararg handles: ServerHandle,
+ healthState: HealthState = HealthState.INSTANCE) = {
+ logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
+ healthState.changeState(HealthDescription.SHUTTING_DOWN)
+ Closeable.closeAll(handles.asIterable()).unsafeRunSync()
+ logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
+}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index 4e2e6d86..62c24308 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,7 +37,7 @@ object VesServer : ServerStarter() {
private fun createVesServer(config: ServerConfiguration): Server {
val collectorProvider = CollectorFactory(
- AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
+ AdapterFactory.configurationProvider(config.configurationProviderParams),
AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration),
MicrometerMetrics.INSTANCE,
config.maximumPayloadSizeBytes
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index da2bfb48..6d219106 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -42,7 +42,6 @@ object ArgVesHvConfigurationTest : Spek({
lateinit var cut: ArgVesHvConfiguration
val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666"
val healthCheckApiPort = "6070"
- val configurationUrl = "http://test-address/test"
val firstRequestDelay = "10"
val requestInterval = "5"
val listenPort = "6969"
@@ -63,7 +62,6 @@ object ArgVesHvConfigurationTest : Spek({
"--kafka-bootstrap-servers", kafkaBootstrapServers,
"--health-check-api-port", healthCheckApiPort,
"--listen-port", listenPort,
- "--config-url", configurationUrl,
"--first-request-delay", firstRequestDelay,
"--request-interval", requestInterval,
"--key-store", "/tmp/keys.p12",
@@ -95,21 +93,16 @@ object ArgVesHvConfigurationTest : Spek({
assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
}
- it("should set proper first consul request delay") {
+ it("should set proper first request delay") {
assertThat(result.configurationProviderParams.firstRequestDelay)
.isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
}
- it("should set proper consul request interval") {
+ it("should set proper request interval") {
assertThat(result.configurationProviderParams.requestInterval)
.isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
}
- it("should set proper config url") {
- assertThat(result.configurationProviderParams.configurationUrl)
- .isEqualTo(configurationUrl)
- }
-
it("should set proper security configuration") {
assertThat(result.securityConfiguration.keys.isEmpty()).isFalse()
@@ -135,7 +128,6 @@ object ArgVesHvConfigurationTest : Spek({
it("should throw exception") {
assertThat(
cut.parseExpectingFailure(
- "--config-url", configurationUrl,
"--ssl-disable",
"--first-request-delay", firstRequestDelay,
"--request-interval", requestInterval
@@ -164,7 +156,6 @@ object ArgVesHvConfigurationTest : Spek({
"--kafka-bootstrap-servers", kafkaBootstrapServers,
"--health-check-api-port", healthCheckApiPort,
"--listen-port", listenPort,
- "--config-url", configurationUrl,
"--first-request-delay", firstRequestDelay,
"--request-interval", requestInterval,
"--key-store", "/tmp/keys.p12",
@@ -183,7 +174,6 @@ object ArgVesHvConfigurationTest : Spek({
"--kafka-bootstrap-servers", kafkaBootstrapServers,
"--health-check-api-port", healthCheckApiPort,
"--listen-port", listenPort,
- "--config-url", configurationUrl,
"--first-request-delay", firstRequestDelay,
"--request-interval", requestInterval,
"--key-store", "/tmp/keys.p12",
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
index e032f00e..e18b0b10 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
@@ -51,7 +51,7 @@ internal object MainTest : Spek({
val healthState: HealthState = mock()
on("closeServers") {
- closeServers(handle, healthState = healthState).unsafeRunSync()
+ closeServers(handle, healthState = healthState).invoke()
it("should close all handles") {
assertThat(closed).isTrue()
diff --git a/sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityUtilsTest.kt b/sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityUtilsTest.kt
new file mode 100644
index 00000000..ddb3e357
--- /dev/null
+++ b/sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityUtilsTest.kt
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ssl.boundary
+
+import com.nhaarman.mockitokotlin2.doReturn
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
+import org.apache.commons.cli.CommandLine
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption
+import org.onap.dcae.collectors.veshv.commandline.hasOption
+
+
+internal object SecurityUtilsTest : Spek({
+
+ describe("creating securty configuration provider") {
+
+ on("command line without ssl disable") {
+ val commandLine: CommandLine = mock()
+ whenever(commandLine.hasOption(CommandLineOption.SSL_DISABLE)).doReturn(false)
+
+ it("should create configuration with some keys") {
+ val configuration = createSecurityConfiguration(commandLine)
+
+ verify(commandLine).hasOption(CommandLineOption.SSL_DISABLE)
+ assertThat(configuration.isSuccess()).isTrue()
+ configuration.map { assertThat(it.keys.isDefined()).isTrue() }
+ }
+ }
+ on("command line with ssl disabled") {
+ val commandLine: CommandLine = mock()
+ whenever(commandLine.hasOption(CommandLineOption.SSL_DISABLE)).doReturn(true)
+
+ it("should create configuration without keys") {
+ val configuration = createSecurityConfiguration(commandLine)
+
+ verify(commandLine).hasOption(CommandLineOption.SSL_DISABLE)
+ assertThat(configuration.isSuccess()).isTrue()
+ configuration.map { assertThat(it.keys.isEmpty()).isTrue() }
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
index 99ecfd74..7d92ddaf 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
@@ -72,3 +72,19 @@ fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
Mono.just<T>(t)
})
}
+
+
+fun <T> Mono<T>.onErrorLog(logger: Logger,
+ mdc: () -> Map<String, String>,
+ msg: () -> String) =
+ doOnError { logException(logger, mdc, msg, it) }
+
+fun <T> Flux<T>.onErrorLog(logger: Logger,
+ mdc: () -> Map<String, String>,
+ msg: () -> String) =
+ doOnError { logException(logger, mdc, msg, it) }
+
+private fun logException(logger: Logger, mdc: () -> Map<String, String>, msg: () -> String, it: Throwable) {
+ logger.error(mdc) { "${msg()}: ${it.message}" }
+ logger.debug(mdc) { "Detailed stack trace: ${it}" }
+}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt
index 2678a8d5..87aea41e 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt
@@ -19,23 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.utils
-import arrow.effects.IO
-
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since January 2019
*/
-
-fun registerShutdownHook(job: () -> Unit) {
- Runtime.getRuntime().addShutdownHook(object : Thread() {
- override fun run() {
- job()
- }
- })
-}
-
-fun registerShutdownHook(job: IO<Unit>) = IO {
- registerShutdownHook {
- job.unsafeRunSync()
- }
-}
+fun registerShutdownHook(job: () -> Unit) =
+ Runtime.getRuntime()
+ .addShutdownHook(Thread({ job() }, "GracefulShutdownThread"))