From 1422bed4c1a002e663fd7c2c4c204831e5f7aa9a Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Thu, 28 Feb 2019 17:33:02 +0100 Subject: Use CBS by means of SDK in place of Consul - changed IO creation in main to fix error with too early calling changeState method on collector HealthState - increased coverage a little with few tests - corrected coverage-report pom file to include new modules - temporarily changed to 1.1.4-SNAPSHOT version of sdk due to need of new version of CBSLookup Change-Id: Ic73b46cf881ab4fabf52bef0327b09082aa90dc6 Issue-ID: DCAEGEN2-1302 Signed-off-by: Filip Krzywka --- build/hv-collector-coverage/pom.xml | 2 +- development/consul.d/cbs.json | 10 ++ development/docker-compose.yml | 73 +++++---- pom.xml | 7 +- .../veshv/commandline/CommandLineOption.kt | 16 +- sources/hv-collector-core/pom.xml | 19 +-- .../collectors/veshv/factory/CollectorFactory.kt | 10 +- .../veshv/impl/adapters/AdapterFactory.kt | 13 +- .../impl/adapters/ConfigurationProviderImpl.kt | 118 ++++++++++++++ .../impl/adapters/ConsulConfigurationProvider.kt | 145 ----------------- .../collectors/veshv/impl/socket/networking.kt | 2 +- .../veshv/model/ConfigurationProviderParams.kt | 5 +- .../impl/adapters/ConfigurationProviderTest.kt | 173 +++++++++++++++++++++ .../adapters/ConsulConfigurationProviderTest.kt | 151 ------------------ .../impl/adapters/kafka/ProtobufSerializerTest.kt | 65 ++++++++ .../adapters/kafka/VesMessageSerializerTest.kt | 66 ++++++++ .../healthcheck/factory/HealthCheckApiServer.kt | 3 +- .../collectors/veshv/main/ArgVesHvConfiguration.kt | 26 ++-- .../org/onap/dcae/collectors/veshv/main/main.kt | 27 ++-- .../collectors/veshv/main/servers/VesServer.kt | 4 +- .../veshv/main/ArgVesHvConfigurationTest.kt | 16 +- .../onap/dcae/collectors/veshv/main/MainTest.kt | 2 +- .../veshv/ssl/boundary/SecurityUtilsTest.kt | 65 ++++++++ .../veshv/utils/logging/reactive_logging.kt | 16 ++ .../dcae/collectors/veshv/utils/shutdown_hook.kt | 19 +-- 25 files changed, 625 insertions(+), 428 deletions(-) create mode 100644 development/consul.d/cbs.json create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt create mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt delete mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt create mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt create mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt create mode 100644 sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityUtilsTest.kt diff --git a/build/hv-collector-coverage/pom.xml b/build/hv-collector-coverage/pom.xml index 08fc5a22..31ff78fc 100644 --- a/build/hv-collector-coverage/pom.xml +++ b/build/hv-collector-coverage/pom.xml @@ -3,7 +3,7 @@ ~ ============LICENSE_START======================================================= ~ dcaegen2-collectors-veshv ~ ================================================================================ - ~ Copyright (C) 2018 NOKIA + ~ Copyright (C) 2018-2019 NOKIA ~ ================================================================================ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. diff --git a/development/consul.d/cbs.json b/development/consul.d/cbs.json new file mode 100644 index 00000000..0761c7e5 --- /dev/null +++ b/development/consul.d/cbs.json @@ -0,0 +1,10 @@ +{ + "service": { + "name": "cbs", + "tags": [ + "cbs" + ], + "port": 10000, + "address": "config-binding-service" + } +} diff --git a/development/docker-compose.yml b/development/docker-compose.yml index c93100ef..708c8f3e 100644 --- a/development/docker-compose.yml +++ b/development/docker-compose.yml @@ -8,13 +8,13 @@ services: message-router-zookeeper: image: wurstmeister/zookeeper ports: - - "2181:2181" + - "2181:2181" message-router-kafka: -# image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1 + # image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1 image: wurstmeister/kafka ports: - - "9092:9092" + - "9092:9092" environment: KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181" @@ -23,9 +23,9 @@ services: KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092" KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT" volumes: - - /var/run/docker.sock:/var/run/docker.sock + - /var/run/docker.sock:/var/run/docker.sock depends_on: - - message-router-zookeeper + - message-router-zookeeper # @@ -35,23 +35,34 @@ services: consul-server: image: docker.io/consul:1.0.6 ports: - - "8500:8500" - command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui"] + - "8500:8500" + command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui", "-config-dir=/consul/consul.d"] + volumes: + - ./consul.d/:/consul/consul.d consul-config: image: consul - depends_on: - - consul-server restart: on-failure - command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{ + command: ["kv", "put", "-http-addr=http://consul-server:8500", "dcae-hv-ves-collector", '{ "collector.routing": [ { "fromDomain": "perf3gpp", "toTopic": "HV_VES_PERF3GPP" } ] - }'] + }' + ] + depends_on: + - consul-server + config-binding-service: + image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.platform.configbinding.app-app:2.2.4 + ports: + - "10000:10000" + environment: + CONSUL_HOST: "consul-server" + depends_on: + - consul-config # # DCAE HV VES Collector @@ -60,30 +71,32 @@ services: ves-hv-collector: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest ports: - - "6060:6060" - - "6061:6061/tcp" + - "6060:6060" + - "6061:6061/tcp" command: ["--listen-port", "6061", "--health-check-api-port", "6060", - "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true", "--kafka-bootstrap-servers", "message-router-kafka:9092", "--key-store-password", "onaponap", "--trust-store-password", "onaponap", - "--first-request-delay", "2", + "--first-request-delay", "5", "--log-level", "DEBUG"] environment: JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml" + CONSUL_HOST: "consul-server" + CONFIG_BINDING_SERVICE: "cbs" + HOSTNAME: "dcae-hv-ves-collector" healthcheck: test: ./healthcheck.sh || exit 1 interval: 10s timeout: 3s retries: 3 - start_period: 20s + start_period: 15s depends_on: - - message-router-kafka - - consul-config + - message-router-kafka + - config-binding-service volumes: - - ./ssl/:/etc/ves-hv/ - - ./logs:/var/log/ONAP/dcae-hv-ves-collector + - ./ssl/:/etc/ves-hv/ + - ./logs:/var/log/ONAP/dcae-hv-ves-collector # @@ -93,8 +106,8 @@ services: xnf-simulator: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator ports: - - "6062:6062/tcp" - - "6063:6063" + - "6062:6062/tcp" + - "6063:6063" command: ["--listen-port", "6062", "--health-check-api-port", "6063", "--ves-host", "ves-hv-collector", @@ -109,19 +122,19 @@ services: retries: 3 start_period: 10s depends_on: - - ves-hv-collector + - ves-hv-collector volumes: - ./ssl/:/etc/ves-hv/ dcae-app-simulator: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator ports: - - "6064:6064/tcp" + - "6064:6064/tcp" command: ["--listen-port", "6064", "--kafka-bootstrap-servers", "message-router-kafka:9092", "--kafka-topics", "HV_VES_PERF3GPP"] depends_on: - - message-router-kafka + - message-router-kafka # # Monitoring @@ -136,16 +149,16 @@ services: grafana: image: grafana/grafana ports: - - "3000:3000" + - "3000:3000" environment: GF_AUTH_DISABLE_LOGIN_FORM: "true" GF_AUTH_DISABLE_SIGNOUT_MENU: "true" GF_AUTH_ANONYMOUS_ENABLED: "true" GF_AUTH_ANONYMOUS_ORG_ROLE: "Admin" volumes: - - ./grafana/datasources:/etc/grafana/provisioning/datasources - - ./grafana/dashboards-providers:/etc/grafana/provisioning/dashboards - # defined in ./grafana/dashboards-providers/dasboard-providers.yaml - - ./grafana/dashboards:/var/lib/grafana/dashboards/hv-ves + - ./grafana/datasources:/etc/grafana/provisioning/datasources + - ./grafana/dashboards-providers:/etc/grafana/provisioning/dashboards + # defined in ./grafana/dashboards-providers/dasboard-providers.yaml + - ./grafana/dashboards:/var/lib/grafana/dashboards/hv-ves diff --git a/pom.xml b/pom.xml index 7d11f03f..93226080 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,7 @@ 1.7 0.8.2 1.0.0-RC11 - 1.1.3 + 1.1.4-SNAPSHOT 3.6.1 @@ -585,6 +585,11 @@ ssl ${sdk.version} + + org.onap.dcaegen2.services.sdk.rest.services + cbs-client + ${sdk.version} + diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt index 0c3f60bb..31849215 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt @@ -38,26 +38,18 @@ enum class CommandLineOption(val option: Option, val required: Boolean = false) .build(), required = true ), - CONSUL_CONFIG_URL( - Option.builder("c") - .longOpt("config-url") - .hasArg() - .desc("URL of ves configuration on consul") - .build(), - required = true - ), - CONSUL_FIRST_REQUEST_DELAY( + CONFIGURATION_FIRST_REQUEST_DELAY( Option.builder("d") .longOpt("first-request-delay") .hasArg() - .desc("Delay of first request to consul in seconds") + .desc("Delay of first request for configuration in seconds") .build() ), - CONSUL_REQUEST_INTERVAL( + CONFIGURATION_REQUEST_INTERVAL( Option.builder("I") .longOpt("request-interval") .hasArg() - .desc("Interval of consul configuration requests in seconds") + .desc("Interval of configuration requests in seconds") .build() ), VES_HV_PORT( diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index 29e1ea94..c21f2ed2 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -3,7 +3,7 @@ ~ ============LICENSE_START======================================================= ~ dcaegen2-collectors-veshv ~ ================================================================================ - ~ Copyright (C) 2018 NOKIA + ~ Copyright (C) 2018-2019 NOKIA ~ ================================================================================ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -19,8 +19,8 @@ ~ ============LICENSE_END========================================================= --> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 @@ -85,6 +85,10 @@ ${project.parent.version} test + + org.onap.dcaegen2.services.sdk.rest.services + cbs-client + org.jetbrains.kotlin @@ -114,15 +118,6 @@ io.projectreactor.kafka reactor-kafka - - javax.json - javax.json-api - - - org.glassfish - javax.json - runtime - diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 535d1baa..633095dc 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.util.concurrent.atomic.AtomicReference @@ -53,18 +54,19 @@ class CollectorFactory(val configuration: ConfigurationProvider, val config: AtomicReference = AtomicReference() configuration() .doOnNext { - logger.info { "Using updated configuration for new connections" } + logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } healthState.changeState(HealthDescription.HEALTHY) } .doOnError { - logger.error { "Failed to acquire configuration from consul" } + logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" } + logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" } healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) } .subscribe(config::set) return object : CollectorProvider { override fun invoke(ctx: ClientContext): Option = - config.getOption().map { createVesHvCollector(it, ctx) } + config.getOption().map { createVesHvCollector(it, ctx) } override fun close() = sinkProvider.close() } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 75b6f0a6..312d6d7b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,8 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.KafkaConfiguration -import reactor.netty.http.client.HttpClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties /** * @author Piotr Jaszczyk @@ -38,8 +39,8 @@ object AdapterFactory { else KafkaSinkProvider(kafkaConfig) - fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = - ConsulConfigurationProvider(httpAdapter(), configurationProviderParams) - - private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) + fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = + ConfigurationProviderImpl( + CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), + configurationProviderParams) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt new file mode 100644 index 00000000..736f474a --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -0,0 +1,118 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import com.google.gson.JsonObject +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.model.routing +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.retry.Jitter +import reactor.retry.Retry +import java.time.Duration + + +/** + * @author Jakub Dudycz + * @since May 2018 + */ +internal class ConfigurationProviderImpl(private val cbsClientMono: Mono, + private val firstRequestDelay: Duration, + private val requestInterval: Duration, + private val healthState: HealthState, + retrySpec: Retry + +) : ConfigurationProvider { + constructor(cbsClientMono: Mono, params: ConfigurationProviderParams) : this( + cbsClientMono, + params.firstRequestDelay, + params.requestInterval, + HealthState.INSTANCE, + Retry.any() + .retryMax(MAX_RETRIES) + .fixedBackoff(params.requestInterval) + .jitter(Jitter.random()) + ) + + private val retry = retrySpec.doOnRetry { + logger.withWarn(ServiceContext::mdc) { + log("Exception from configuration provider client, retrying subscription", it.exception()) + } + healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + } + + override fun invoke(): Flux = + cbsClientMono + .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } + .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } + .retryWhen(retry) + .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } + .flatMapMany(::handleUpdates) + + private fun handleUpdates(cbsClient: CbsClient): Flux = cbsClient + .updates(RequestDiagnosticContext.create(), + firstRequestDelay, + requestInterval) + .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } + .map(::createCollectorConfiguration) + .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } + .retryWhen(retry) + + + private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration = + try { + val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY) + CollectorConfiguration( + routing { + for (route in routingArray) { + val routeObj = route.asJsonObject + defineRoute { + fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY)) + toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY)) + withFixedPartitioning() + } + } + }.build() + ) + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse configuration", e) + } + + private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString + + + companion object { + private const val ROUTING_CONFIGURATION_KEY = "collector.routing" + private const val DOMAIN_CONFIGURATION_KEY = "fromDomain" + private const val TOPIC_CONFIGURATION_KEY = "toTopic" + + private const val MAX_RETRIES = 5L + private val logger = Logger(ConfigurationProviderImpl::class) + } +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt deleted file mode 100644 index d58cc792..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ /dev/null @@ -1,145 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.model.routing -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.Marker -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.retry.Jitter -import reactor.retry.Retry -import java.io.StringReader -import java.security.MessageDigest -import java.time.Duration -import java.util.* -import java.util.concurrent.atomic.AtomicReference -import javax.json.Json -import javax.json.JsonObject - - -/** - * @author Jakub Dudycz - * @since May 2018 - */ -internal class ConsulConfigurationProvider(private val http: HttpAdapter, - private val url: String, - private val firstRequestDelay: Duration, - private val requestInterval: Duration, - private val healthState: HealthState, - retrySpec: Retry - -) : ConfigurationProvider { - private val lastConfigurationHash: AtomicReference = AtomicReference(byteArrayOf()) - private val retry = retrySpec.doOnRetry { - logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) } - healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) - } - - constructor(http: HttpAdapter, - params: ConfigurationProviderParams) : this( - http, - params.configurationUrl, - params.firstRequestDelay, - params.requestInterval, - HealthState.INSTANCE, - Retry.any() - .retryMax(MAX_RETRIES) - .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) - .jitter(Jitter.random()) - ) - - override fun invoke(): Flux = - Flux.interval(firstRequestDelay, requestInterval) - .concatMap { askForConfig() } - .flatMap(::filterDifferentValues) - .map(::parseJsonResponse) - .map(::createCollectorConfiguration) - .retryWhen(retry) - - private fun askForConfig(): Mono = Mono.defer { - val invocationId = UUID.randomUUID() - http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) } - } - - private fun filterDifferentValues(configuration: BodyWithInvocationId) = - configuration.body.let { configurationString -> - configurationString.sha256().let { newHash -> - if (newHash contentEquals lastConfigurationHash.get()) { - logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) { - "No change detected in consul configuration" - } - Mono.empty() - } else { - logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) { - "Obtained new configuration from consul:\n$configurationString" - } - lastConfigurationHash.set(newHash) - Mono.just(configurationString) - } - } - } - - private fun parseJsonResponse(responseString: String): JsonObject = - Json.createReader(StringReader(responseString)).readObject() - - private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration = - try { - val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY) - CollectorConfiguration( - routing { - for (route in routingArray) { - val routeObj = route.asJsonObject() - defineRoute { - fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY)) - toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY)) - withFixedPartitioning() - } - } - }.build() - ) - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse consul configuration", e) - } - - - companion object { - private const val ROUTING_CONFIGURATION_KEY = "collector.routing" - private const val DOMAIN_CONFIGURATION_KEY = "fromDomain" - private const val TOPIC_CONFIGURATION_KEY = "toTopic" - - private const val MAX_RETRIES = 5L - private const val BACKOFF_INTERVAL_FACTOR = 30L - private val logger = Logger(ConsulConfigurationProvider::class) - private fun String.sha256() = - MessageDigest - .getInstance("SHA-256") - .digest(toByteArray()) - - } - - private data class BodyWithInvocationId(val body: String, val invocationId: UUID) -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt index 91f502e6..a1e5b8fd 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt @@ -78,4 +78,4 @@ internal fun populateClientContextFromInbound(clientContext: ClientContext, nett withConnectionFrom(nettyInbound) { connection -> clientContext.clientAddress = Try { connection.address().address }.toOption() clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() } - } \ No newline at end of file + } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt index 9de34498..ac7a9db0 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,5 @@ import java.time.Duration * @author Jakub Dudycz * @since July 2018 */ -data class ConfigurationProviderParams(val configurationUrl: String, - val firstRequestDelay: Duration, +data class ConfigurationProviderParams(val firstRequestDelay: Duration, val requestInterval: Duration) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt new file mode 100644 index 00000000..21aaa129 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt @@ -0,0 +1,173 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import com.google.gson.JsonParser +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import reactor.core.publisher.Flux + +import reactor.core.publisher.Mono +import reactor.retry.Retry +import reactor.test.StepVerifier +import java.time.Duration + +/** + * @author Jakub Dudycz + * @since May 2018 + */ +internal object ConfigurationProviderImplTest : Spek({ + + describe("Configuration provider") { + + val cbsClient: CbsClient = mock() + val cbsClientMock: Mono = Mono.just(cbsClient) + val healthStateProvider = HealthState.INSTANCE + + given("configuration is never in cbs") { + val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) + + on("waiting for configuration") { + val waitTime = Duration.ofMillis(100) + + it("should not get it") { + StepVerifier.create(configProvider().take(1)) + .expectNoEvent(waitTime) + } + } + + } + given("valid configuration from cbs") { + val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) + + on("new configuration") { + whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + .thenReturn(Flux.just(validConfiguration)) + it("should use received configuration") { + + StepVerifier.create(configProvider().take(1)) + .consumeNextWith { + + val route1 = it.routing.routes[0] + assertThat(FAULT.domainName) + .describedAs("routed domain 1") + .isEqualTo(route1.domain) + assertThat("test-topic-1") + .describedAs("target topic 1") + .isEqualTo(route1.targetTopic) + + val route2 = it.routing.routes[1] + assertThat(HEARTBEAT.domainName) + .describedAs("routed domain 2") + .isEqualTo(route2.domain) + assertThat("test-topic-2") + .describedAs("target topic 2") + .isEqualTo(route2.targetTopic) + + }.verifyComplete() + } + } + + } + given("invalid configuration from cbs") { + val iterationCount = 3L + val configProvider = constructConfigurationProvider( + cbsClientMock, healthStateProvider, iterationCount + ) + + on("new configuration") { + whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + .thenReturn(Flux.just(invalidConfiguration)) + + it("should interrupt the flux") { + StepVerifier.create(configProvider()) + .verifyError() + } + + it("should update the health state") { + StepVerifier.create(healthStateProvider().take(iterationCount)) + .expectNextCount(iterationCount - 1) + .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + .verifyComplete() + } + } + } + } + +}) + + +private val validConfiguration = JsonParser().parse(""" +{ + "whatever": "garbage", + "collector.routing": [ + { + "fromDomain": "fault", + "toTopic": "test-topic-1" + }, + { + "fromDomain": "heartbeat", + "toTopic": "test-topic-2" + } + ] +}""").asJsonObject + +private val invalidConfiguration = JsonParser().parse(""" +{ + "whatever": "garbage", + "collector.routing": [ + { + "fromDomain": "garbage", + "meaningful": "garbage" + } + ] +}""").asJsonObject + +private val firstRequestDelay = Duration.ofMillis(1) +private val requestInterval = Duration.ofMillis(1) + +private fun constructConfigurationProvider(cbsClientMono: Mono, + healthState: HealthState, + iterationCount: Long = 1 +): ConfigurationProviderImpl { + + val retry = Retry.onlyIf { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) + + return ConfigurationProviderImpl( + cbsClientMono, + firstRequestDelay, + requestInterval, + healthState, + retry + ) +} diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt deleted file mode 100644 index ccae3c99..00000000 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ /dev/null @@ -1,151 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import com.nhaarman.mockitokotlin2.any -import com.nhaarman.mockitokotlin2.eq -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.mockito.Mockito -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState - -import reactor.core.publisher.Mono -import reactor.retry.Retry -import reactor.test.StepVerifier -import java.time.Duration - -/** - * @author Jakub Dudycz - * @since May 2018 - */ -internal object ConsulConfigurationProviderTest : Spek({ - - describe("Consul configuration provider") { - - val httpAdapterMock: HttpAdapter = mock() - val healthStateProvider = HealthState.INSTANCE - - given("valid resource url") { - val validUrl = "http://valid-url/" - val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider) - - on("call to consul") { - whenever(httpAdapterMock.get(eq(validUrl), any(), Mockito.anyMap())) - .thenReturn(Mono.just(constructConsulResponse())) - - it("should use received configuration") { - - StepVerifier.create(consulConfigProvider().take(1)) - .consumeNextWith { - - val route1 = it.routing.routes[0] - assertThat(FAULT.domainName) - .describedAs("routed domain 1") - .isEqualTo(route1.domain) - assertThat("test-topic-1") - .describedAs("target topic 1") - .isEqualTo(route1.targetTopic) - - val route2 = it.routing.routes[1] - assertThat(HEARTBEAT.domainName) - .describedAs("routed domain 2") - .isEqualTo(route2.domain) - assertThat("test-topic-2") - .describedAs("target topic 2") - .isEqualTo(route2.targetTopic) - - }.verifyComplete() - } - } - - } - given("invalid resource url") { - val invalidUrl = "http://invalid-url/" - - val iterationCount = 3L - val consulConfigProvider = constructConsulConfigProvider( - invalidUrl, httpAdapterMock, healthStateProvider, iterationCount - ) - - on("call to consul") { - whenever(httpAdapterMock.get(eq(invalidUrl), any(), Mockito.anyMap())) - .thenReturn(Mono.error(RuntimeException("Test exception"))) - - it("should interrupt the flux") { - - StepVerifier.create(consulConfigProvider()) - .verifyErrorMessage("Test exception") - } - - it("should update the health state") { - StepVerifier.create(healthStateProvider().take(iterationCount)) - .expectNextCount(iterationCount - 1) - .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) - .verifyComplete() - } - } - } - } - -}) - -private fun constructConsulConfigProvider(url: String, - httpAdapter: HttpAdapter, - healthState: HealthState, - iterationCount: Long = 1 -): ConsulConfigurationProvider { - - val firstRequestDelay = Duration.ofMillis(1) - val requestInterval = Duration.ofMillis(1) - val retry = Retry.onlyIf { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) - - return ConsulConfigurationProvider( - httpAdapter, - url, - firstRequestDelay, - requestInterval, - healthState, - retry - ) -} - -fun constructConsulResponse(): String = - """{ - "whatever": "garbage", - "collector.routing": [ - { - "fromDomain": "fault", - "toTopic": "test-topic-1" - }, - { - "fromDomain": "heartbeat", - "toTopic": "test-topic-2" - } - ] - }""" diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt new file mode 100644 index 00000000..63caaf0a --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import com.google.protobuf.MessageLite +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.ves.VesEventOuterClass.CommonEventHeader.* + + +class ProtobufSerializerTest : Spek({ + + describe("ProtobufSerializerTest") { + val serializer = ProtobufSerializer() + + on("serialize") { + it("should return byte array from WTP Frame paylaod") { + val header = getDefaultInstance() + val payload = header.toByteArray() + val msg: MessageLite = mock() + + serializer.serialize("", msg) + + verify(msg).toByteArray() + } + } + + on("configuring") { + it("should do nothing") { + // increase code coverage + serializer.configure(hashMapOf(), false) + } + } + + on("closing") { + it("should do nothing") { + // increase code coverage + serializer.close() + } + } + } + + +}) \ No newline at end of file diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt new file mode 100644 index 00000000..3a194b47 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.ves.VesEventOuterClass.CommonEventHeader.* + + +class VesMessageSerializerTest : Spek({ + + describe("VesMessageSerializer") { + val serializer = VesMessageSerializer() + + on("serialize") { + it("should return byte array from WTP Frame paylaod") { + val header = getDefaultInstance() + val payload = header.toByteArray() + val msg = VesMessage(header, WireFrameMessage(payload)) + + val serialized = serializer.serialize("", msg) + + assertThat(serialized).isEqualTo(payload) + } + } + + on("configuring") { + it("should do nothing") { + // increase code coverage + serializer.configure(hashMapOf(), false) + } + } + + on("closing") { + it("should do nothing") { + // increase code coverage + serializer.close() + } + } + } + + + +}) \ No newline at end of file diff --git a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt index 32486009..fb5bb9a2 100644 --- a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt +++ b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,6 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.netty.DisposableServer import reactor.netty.http.server.HttpServer import reactor.netty.http.server.HttpServerRequest import reactor.netty.http.server.HttpServerResponse diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index 99ec5e1e..bb484cfe 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,9 +27,8 @@ import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_CONFIG_URL -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_REQUEST_INTERVAL +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FIRST_REQUEST_DELAY import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.IDLE_TIMEOUT_SEC @@ -64,9 +63,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration = Option.monad().binding { - val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind() val firstRequestDelay = cmdLine.longValue( - CONSUL_FIRST_REQUEST_DELAY, - DefaultValues.CONSUL_FIRST_REQUEST_DELAY + CONFIGURATION_FIRST_REQUEST_DELAY, + DefaultValues.CONFIGURATION_FIRST_REQUEST_DELAY ) val requestInterval = cmdLine.longValue( - CONSUL_REQUEST_INTERVAL, - DefaultValues.CONSUL_REQUEST_INTERVAL + CONFIGURATION_REQUEST_INTERVAL, + DefaultValues.CONFIGURATION_REQUEST_INTERVAL ) ConfigurationProviderParams( - configUrl, Duration.ofSeconds(firstRequestDelay), Duration.ofSeconds(requestInterval) ) @@ -145,8 +141,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration) = logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } ExitFailure(1) }, - { logger.debug(ServiceContext::mdc) { "Finished" } } + { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } } ) private fun startAndAwaitServers(config: ServerConfiguration) = IO.monad().binding { Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) logger.info { "Using configuration: $config" } + val healthCheckServerHandle = HealthCheckServer.start(config).bind() - VesServer.start(config).bind().let { handle -> - registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind() - handle.await().bind() - } - }.fix() + val hvVesHandle = VesServer.start(config).bind() -internal fun closeServers(vararg handles: ServerHandle, healthState: HealthState = HealthState.INSTANCE): IO = - IO.monadError().binding { - healthState.changeState(HealthDescription.SHUTTING_DOWN) - Closeable.closeAll(handles.asIterable()).bind() - logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } + registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle)) + hvVesHandle.await().bind() }.fix() + +internal fun closeServers(vararg handles: ServerHandle, + healthState: HealthState = HealthState.INSTANCE) = { + logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } + healthState.changeState(HealthDescription.SHUTTING_DOWN) + Closeable.closeAll(handles.asIterable()).unsafeRunSync() + logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } +} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index 4e2e6d86..62c24308 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ object VesServer : ServerStarter() { private fun createVesServer(config: ServerConfiguration): Server { val collectorProvider = CollectorFactory( - AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), + AdapterFactory.configurationProvider(config.configurationProviderParams), AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration), MicrometerMetrics.INSTANCE, config.maximumPayloadSizeBytes diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt index da2bfb48..6d219106 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,6 @@ object ArgVesHvConfigurationTest : Spek({ lateinit var cut: ArgVesHvConfiguration val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666" val healthCheckApiPort = "6070" - val configurationUrl = "http://test-address/test" val firstRequestDelay = "10" val requestInterval = "5" val listenPort = "6969" @@ -63,7 +62,6 @@ object ArgVesHvConfigurationTest : Spek({ "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, - "--config-url", configurationUrl, "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval, "--key-store", "/tmp/keys.p12", @@ -95,21 +93,16 @@ object ArgVesHvConfigurationTest : Spek({ assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0") } - it("should set proper first consul request delay") { + it("should set proper first request delay") { assertThat(result.configurationProviderParams.firstRequestDelay) .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong())) } - it("should set proper consul request interval") { + it("should set proper request interval") { assertThat(result.configurationProviderParams.requestInterval) .isEqualTo(Duration.ofSeconds(requestInterval.toLong())) } - it("should set proper config url") { - assertThat(result.configurationProviderParams.configurationUrl) - .isEqualTo(configurationUrl) - } - it("should set proper security configuration") { assertThat(result.securityConfiguration.keys.isEmpty()).isFalse() @@ -135,7 +128,6 @@ object ArgVesHvConfigurationTest : Spek({ it("should throw exception") { assertThat( cut.parseExpectingFailure( - "--config-url", configurationUrl, "--ssl-disable", "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval @@ -164,7 +156,6 @@ object ArgVesHvConfigurationTest : Spek({ "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, - "--config-url", configurationUrl, "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval, "--key-store", "/tmp/keys.p12", @@ -183,7 +174,6 @@ object ArgVesHvConfigurationTest : Spek({ "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, - "--config-url", configurationUrl, "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval, "--key-store", "/tmp/keys.p12", diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt index e032f00e..e18b0b10 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt @@ -51,7 +51,7 @@ internal object MainTest : Spek({ val healthState: HealthState = mock() on("closeServers") { - closeServers(handle, healthState = healthState).unsafeRunSync() + closeServers(handle, healthState = healthState).invoke() it("should close all handles") { assertThat(closed).isTrue() diff --git a/sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityUtilsTest.kt b/sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityUtilsTest.kt new file mode 100644 index 00000000..ddb3e357 --- /dev/null +++ b/sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityUtilsTest.kt @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ssl.boundary + +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import org.apache.commons.cli.CommandLine +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption +import org.onap.dcae.collectors.veshv.commandline.hasOption + + +internal object SecurityUtilsTest : Spek({ + + describe("creating securty configuration provider") { + + on("command line without ssl disable") { + val commandLine: CommandLine = mock() + whenever(commandLine.hasOption(CommandLineOption.SSL_DISABLE)).doReturn(false) + + it("should create configuration with some keys") { + val configuration = createSecurityConfiguration(commandLine) + + verify(commandLine).hasOption(CommandLineOption.SSL_DISABLE) + assertThat(configuration.isSuccess()).isTrue() + configuration.map { assertThat(it.keys.isDefined()).isTrue() } + } + } + on("command line with ssl disabled") { + val commandLine: CommandLine = mock() + whenever(commandLine.hasOption(CommandLineOption.SSL_DISABLE)).doReturn(true) + + it("should create configuration without keys") { + val configuration = createSecurityConfiguration(commandLine) + + verify(commandLine).hasOption(CommandLineOption.SSL_DISABLE) + assertThat(configuration.isSuccess()).isTrue() + configuration.map { assertThat(it.keys.isEmpty()).isTrue() } + } + } + } +}) diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index 99ecfd74..7d92ddaf 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -72,3 +72,19 @@ fun Flux.filterFailedWithLog(logger: Logger, Mono.just(t) }) } + + +fun Mono.onErrorLog(logger: Logger, + mdc: () -> Map, + msg: () -> String) = + doOnError { logException(logger, mdc, msg, it) } + +fun Flux.onErrorLog(logger: Logger, + mdc: () -> Map, + msg: () -> String) = + doOnError { logException(logger, mdc, msg, it) } + +private fun logException(logger: Logger, mdc: () -> Map, msg: () -> String, it: Throwable) { + logger.error(mdc) { "${msg()}: ${it.message}" } + logger.debug(mdc) { "Detailed stack trace: ${it}" } +} diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt index 2678a8d5..87aea41e 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt @@ -19,23 +19,10 @@ */ package org.onap.dcae.collectors.veshv.utils -import arrow.effects.IO - /** * @author Piotr Jaszczyk * @since January 2019 */ - -fun registerShutdownHook(job: () -> Unit) { - Runtime.getRuntime().addShutdownHook(object : Thread() { - override fun run() { - job() - } - }) -} - -fun registerShutdownHook(job: IO) = IO { - registerShutdownHook { - job.unsafeRunSync() - } -} +fun registerShutdownHook(job: () -> Unit) = + Runtime.getRuntime() + .addShutdownHook(Thread({ job() }, "GracefulShutdownThread")) -- cgit 1.2.3-korg