diff options
59 files changed, 1473 insertions, 593 deletions
@@ -62,10 +62,23 @@ committers: company: 'ATT' id: 'jflucas' timezone: 'America/New_York' - - name: 'Przemyslaw Wasala ' - email: 'przemyslaw.wasala@nokia.com' + - name: 'Piotr Jaszczyk' + email: 'piotr.jaszczyk@nokia.com' company: 'Nokia' - id: 'przemyslaw.wasala' + id: 'jaszczur' + timezone: 'Europe/Warsaw' + - name: 'Piotr Wielebski' + email: 'piotr.wielebski@nokia.com' + company: 'Nokia' + id: 'pwielebs' timezone: 'Europe/Warsaw' tsc: approval: 'https://lists.onap.org/pipermail/onap-tsc' + changes: + - type: 'Removal' + name: 'Przemyslaw Wasala' + link: 'https://lists.onap.org/g/onap-tsc/message/4248' + - type: 'Addition' + name: 'Piotr Jaszczyk' + name: 'Piotr Wielebski' + link: 'https://lists.onap.org/g/onap-tsc/message/4259' diff --git a/development/bin/consul.sh b/development/bin/consul.sh new file mode 100755 index 00000000..c229f83e --- /dev/null +++ b/development/bin/consul.sh @@ -0,0 +1,79 @@ +#!/usr/bin/env bash +# ============LICENSE_START======================================================= +# dcaegen2-collectors-veshv +# ================================================================================ +# Copyright (C) 2018 NOKIA +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +set -euo pipefail + +usage() { + echo "Put HV-VES configuration into Consul key-value store" + echo "Usage: $0 [-h|--help] [-v|--verbose] [domain [topic]]" + exit 1 +} + +optspec=":vh-:" # catch v, h and - +while getopts "$optspec" arg; do + case "${arg}" in + -) # handle longopts + case "${OPTARG}" in + verbose) + VERBOSE=True + ;; + help) + usage + ;; + *) + echo "Unknown option --${OPTARG}" >&2 + usage + ;; + esac + ;; + v) + VERBOSE=True + ;; + h) + usage + ;; + *) + echo "Unknown option -${OPTARG}" >&2 + usage + ;; + esac +done +shift $((OPTIND-1)) + +DOMAIN=${1:-perf3gpp} +TOPIC=${2:-HV_VES_PERF3GPP} + +CONFIGURATION=" +{ + \"dmaap.kafkaBootstrapServers\": \"message-router-kafka:9092\", + \"collector.routing\": + [{ + \"fromDomain\": \"${DOMAIN}\", + \"toTopic\": \"${TOPIC}\" + }] +}" +CONFIGURATION_ENDPOINT=localhost:8500/v1/kv/veshv-config + + +if [ -n "${VERBOSE+x}" ]; then + echo "Configuration: ${CONFIGURATION}" + echo "Putting configuration under ${CONFIGURATION_ENDPOINT}." +fi +curl --request PUT ${CONFIGURATION_ENDPOINT} -d "${CONFIGURATION}" +echo diff --git a/development/bin/dcae-msgs.sh b/development/bin/dcae-msgs.sh new file mode 100755 index 00000000..cb05a8c3 --- /dev/null +++ b/development/bin/dcae-msgs.sh @@ -0,0 +1,64 @@ +#!/usr/bin/env bash +# ============LICENSE_START======================================================= +# dcaegen2-collectors-veshv +# ================================================================================ +# Copyright (C) 2018 NOKIA +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +set -euo pipefail + +usage() { + echo "Return current amount of consumed messages by dcae-app-simulator" + echo "Usage: $0 [-h|--help] [-v|--verbose]" + exit 1 +} + +optspec=":vh-:" # catch v, h and - +while getopts "$optspec" arg; do + case "${arg}" in + -) # handle longopts + case "${OPTARG}" in + verbose) + VERBOSE=True + ;; + help) + usage + ;; + *) + echo "Unknown option --${OPTARG}" >&2 + usage + ;; + esac + ;; + v) + VERBOSE=True + ;; + h) + usage + ;; + *) + echo "Unknown option -${OPTARG}" >&2 + usage + ;; + esac +done +shift $((OPTIND-1)) + +if [ -n "${VERBOSE+x}" ]; then + echo "All messages count currently consumed by dcae app simulator: " +fi + +curl --request GET localhost:6063/messages/all/count +echo diff --git a/development/bin/dcae-reset.sh b/development/bin/dcae-reset.sh new file mode 100755 index 00000000..e5b7b056 --- /dev/null +++ b/development/bin/dcae-reset.sh @@ -0,0 +1,65 @@ +#!/usr/bin/env bash +# ============LICENSE_START======================================================= +# dcaegen2-collectors-veshv +# ================================================================================ +# Copyright (C) 2018 NOKIA +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +set -euo pipefail + + +usage() { + echo "Resets dcae-app-simulator consumed messages count" + echo "Usage: $0 [-h|--help] [-v|--verbose]" + exit 1 +} + +optspec=":vh-:" # catch v, h and - +while getopts "$optspec" arg; do + case "${arg}" in + -) # handle longopts + case "${OPTARG}" in + verbose) + VERBOSE=True + ;; + help) + usage + ;; + *) + echo "Unknown option --${OPTARG}" >&2 + usage + ;; + esac + ;; + v) + VERBOSE=True + ;; + h) + usage + ;; + *) + echo "Unknown option -${OPTARG}" >&2 + usage + ;; + esac +done +shift $((OPTIND-1)) + +if [ -n "${VERBOSE+x}" ]; then + echo "Requesting DCAE app running on port 6063 to reset messages count" +fi + +curl --request DELETE localhost:6063/messages +echo diff --git a/development/bin/dcae-topic.sh b/development/bin/dcae-topic.sh new file mode 100755 index 00000000..8c176221 --- /dev/null +++ b/development/bin/dcae-topic.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash +# ============LICENSE_START======================================================= +# dcaegen2-collectors-veshv +# ================================================================================ +# Copyright (C) 2018 NOKIA +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +set -euo pipefail + +usage() { + echo "Set dcae-app-simulator to start consuming messages from given topic (HV_VES_PERF3GPP by default)" + echo "Usage: $0 [-h|--help] [-v|--verbose] [topic]" + exit 1 +} + +optspec=":vh-:" # catch v, h and - +while getopts "$optspec" arg; do + case "${arg}" in + -) # handle longopts + case "${OPTARG}" in + verbose) + VERBOSE=True + ;; + help) + usage + ;; + *) + echo "Unknown option --${OPTARG}" >&2 + usage + ;; + esac + ;; + v) + VERBOSE=True + ;; + h) + usage + ;; + *) + echo "Unknown option -${OPTARG}" >&2 + usage + ;; + esac +done +shift $((OPTIND-1)) + +TOPIC=${1:-HV_VES_PERF3GPP} + +if [ -n "${VERBOSE+x}" ]; then + echo "Requesting DCAE app running on port 6063 to consume messages from topic: ${TOPIC}" +fi + +curl --request PUT localhost:6063/configuration/topics -d ${TOPIC} +echo
\ No newline at end of file diff --git a/development/bin/run-xnf-simulator.sh b/development/bin/run-xnf-simulator.sh new file mode 100755 index 00000000..3fe96928 --- /dev/null +++ b/development/bin/run-xnf-simulator.sh @@ -0,0 +1,81 @@ +#!/usr/bin/env bash +# ============LICENSE_START======================================================= +# dcaegen2-collectors-veshv +# ================================================================================ +# Copyright (C) 2018 NOKIA +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +set -euo pipefail + +usage() { + echo "Start xnf-simulator container on given port and inside of given docker-network (by default 'development_default')" + echo "Usage: $0 [-h|--help] [-v|--verbose] <xnf listen port> [<hv ves docker network>]" + exit 1 +} + +optspec=":vh-:" # catch v, h and - +while getopts "$optspec" arg; do + case "${arg}" in + -) # handle longopts + case "${OPTARG}" in + verbose) + VERBOSE=True + ;; + help) + usage + ;; + *) + echo "Unknown option --${OPTARG}" >&2 + usage + ;; + esac + ;; + v) + VERBOSE=True + ;; + h) + usage + ;; + *) + echo "Unknown option -${OPTARG}" >&2 + usage + ;; + esac +done +shift $((OPTIND-1)) + +[ $# -eq 0 ] && usage + + +LISTEN_PORT=$1 +HV_VES_NETWORK=${2:-development_default} + +PORTS="${LISTEN_PORT}:${LISTEN_PORT}/tcp" +HV_VES_REPO_HOME=`pwd`/.. + +if [ -n "${VERBOSE+x}" ]; then + echo "Starting xnf-simulator with ports configuration: ${PORTS} on network: ${HV_VES_NETWORK}" + echo "Container id:" +fi +docker run -d \ + -v ${HV_VES_REPO_HOME}/ssl/:/etc/ves-hv/ \ + -p ${PORTS} \ + --network ${HV_VES_NETWORK} \ + onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator \ + --listen-port ${LISTEN_PORT} \ + --ves-host ves-hv-collector \ + --ves-port 6061 \ + --key-store-password onaponap \ + --trust-store-password onaponap
\ No newline at end of file diff --git a/development/bin/xnf-simulation.sh b/development/bin/xnf-simulation.sh new file mode 100755 index 00000000..e1d65aa0 --- /dev/null +++ b/development/bin/xnf-simulation.sh @@ -0,0 +1,103 @@ +#!/usr/bin/env bash +# ============LICENSE_START======================================================= +# dcaegen2-collectors-veshv +# ================================================================================ +# Copyright (C) 2018 NOKIA +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +set -euo pipefail + +usage() { + echo "Send request to xnf-simulator" + echo "Usage: $0 [-h|--help] [-v|--verbose] [<xnf listen port> [<messages amount> [<messages type> [<xnf endpoint>]]]]" + exit 1 +} + +optspec=":vh-:" # catch v, h and - +while getopts "$optspec" arg; do + case "${arg}" in + -) # handle longopts + case "${OPTARG}" in + verbose) + VERBOSE=True + ;; + help) + usage + ;; + *) + echo "Unknown option --${OPTARG}" >&2 + usage + ;; + esac + ;; + v) + VERBOSE=True + ;; + h) + usage + ;; + *) + echo "Unknown option -${OPTARG}" >&2 + usage + ;; + esac +done +shift $((OPTIND-1)) + +XNF_PORT=${1:-6062} +MESSAGES_AMOUNT=${2:-1} +MESSAGES_TYPE=${3:-VALID} +XNF_ENDPOINT=simulator/async + +if [ -n "${VERBOSE+x}" ]; then + echo "Requesting xnf-simulator on port ${XNF_PORT} to send ${MESSAGES_AMOUNT} messages of type ${MESSAGES_TYPE}" +fi + +REQUEST_ID=$(curl --request POST -s localhost:${XNF_PORT}/${XNF_ENDPOINT} -d " +[ + { + \"commonEventHeader\": { + \"version\": \"sample-version\", + \"domain\": \"perf3gpp\", + \"sequence\": 1, + \"priority\": 1, + \"eventId\": \"sample-event-id\", + \"eventName\": \"sample-event-name\", + \"eventType\": \"sample-event-type\", + \"startEpochMicrosec\": 120034455, + \"lastEpochMicrosec\": 120034455, + \"nfNamingCode\": \"sample-nf-naming-code\", + \"nfcNamingCode\": \"sample-nfc-naming-code\", + \"reportingEntityId\": \"sample-reporting-entity-id\", + \"reportingEntityName\": \"sample-reporting-entity-name\", + \"sourceId\": \"sample-source-id\", + \"sourceName\": \"sample-source-name\", + \"vesEventListenerVersion\": \"7.2.0\" + }, + \"messageType\": \"${MESSAGES_TYPE}\", + \"messagesAmount\": ${MESSAGES_AMOUNT} + } +]") + +if [ -n "${VERBOSE+x}" ]; then + echo -e "Request id: ${REQUEST_ID}\n" + + echo "To check request status execute:" + echo "curl --request GET localhost:${XNF_PORT}/simulator/${REQUEST_ID}" + echo "To further debug you can try something similiar to:" + echo "docker ps -a | grep ${XNF_PORT} | awk '{ print \$1 }' | xargs docker logs" +else + echo "${REQUEST_ID}" +fi
\ No newline at end of file diff --git a/development/docker-compose.yml b/development/docker-compose.yml new file mode 100644 index 00000000..d4c3f1d8 --- /dev/null +++ b/development/docker-compose.yml @@ -0,0 +1,112 @@ +version: "3.5" +services: + + # + # DMaaP Message Router + # + + message-router-zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + + message-router-kafka: +# image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1 + image: wurstmeister/kafka + ports: + - "9092:9092" + environment: + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT" + KAFKA_ADVERTISED_LISTENERS: "INTERNAL_PLAINTEXT://message-router-kafka:9092" + KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092" + KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - message-router-zookeeper + + + # + # Consul / CBS + # + + consul-server: + image: docker.io/consul:1.0.6 + ports: + - "8500:8500" + command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui"] + + consul-config: + image: consul + depends_on: + - consul-server + restart: on-failure + command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{ + "dmaap.kafkaBootstrapServers": "message-router-kafka:9092", + "collector.routing": [ + { + "fromDomain": "perf3gpp", + "toTopic": "HV_VES_PERF3GPP" + } + ] + }'] + + + # + # DCAE HV VES Collector + # + + ves-hv-collector: + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest + ports: + - "6060:6060" + - "6061:6061/tcp" + entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid", + "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"] + command: ["--listen-port", "6061", + "--health-check-api-port", "6060", + "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true", + "--key-store-password", "onaponap", + "--trust-store-password", "onaponap"] + healthcheck: + test: curl -f http://localhost:6060/health/ready || exit 1 + interval: 10s + timeout: 3s + retries: 3 + start_period: 20s + depends_on: + - message-router-kafka + - consul-server + volumes: + - ./ssl/:/etc/ves-hv/ + + + # + # Simulators + # + + xnf-simulator: + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator + ports: + - "6062:6062/tcp" + command: ["--listen-port", "6062", + "--ves-host", "ves-hv-collector", + "--ves-port", "6061", + "--key-store-password", "onaponap", + "--trust-store-password", "onaponap"] + depends_on: + - ves-hv-collector + volumes: + - ./ssl/:/etc/ves-hv/ + + dcae-app-simulator: + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator + ports: + - "6063:6063/tcp" + command: ["--listen-port", "6063", + "--kafka-bootstrap-servers", "message-router-kafka:9092", + "--kafka-topics", "HV_VES_PERF3GPP"] + depends_on: + - message-router-kafka diff --git a/ssl/.gitignore b/development/ssl/.gitignore index 23888eb0..23888eb0 100644 --- a/ssl/.gitignore +++ b/development/ssl/.gitignore diff --git a/ssl/Makefile-openssl b/development/ssl/Makefile-openssl index 09802ce4..09802ce4 100644 --- a/ssl/Makefile-openssl +++ b/development/ssl/Makefile-openssl diff --git a/ssl/README.md b/development/ssl/README.md index c2819d24..c2819d24 100644 --- a/ssl/README.md +++ b/development/ssl/README.md diff --git a/ssl/gen-certs.sh b/development/ssl/gen-certs.sh index b4f78227..b4f78227 100755 --- a/ssl/gen-certs.sh +++ b/development/ssl/gen-certs.sh diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 4015b08b..00000000 --- a/docker-compose.yml +++ /dev/null @@ -1,85 +0,0 @@ -version: "3.5" -services: - zookeeper: - image: wurstmeister/zookeeper - ports: - - "2181:2181" - - kafka: - image: wurstmeister/kafka - ports: - - "9092:9092" - environment: - KAFKA_ADVERTISED_HOST_NAME: "kafka" - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' - KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092" - volumes: - - /var/run/docker.sock:/var/run/docker.sock - depends_on: - - zookeeper - - consul: - image: progrium/consul - ports: - - "8500:8500" - environment: - - CONSUL_BIND_INTERFACE=eth0 - command: ["-server", "-bootstrap", "-ui-dir", "/ui"] - - ves-hv-collector: - image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest -# build: -# context: hv-collector-main -# dockerfile: Dockerfile - ports: - - "6060:6060" - - "6061:6061/tcp" - entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid", - "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"] - command: ["--listen-port", "6061", - "--health-check-api-port", "6060", - "--config-url", "http://consul:8500/v1/kv/veshv-config?raw=true", - "--key-store-password", "onaponap", - "--trust-store-password", "onaponap"] - healthcheck: - test: curl -f http://localhost:6060/health/ready || exit 1 - interval: 10s - timeout: 3s - retries: 3 - start_period: 20s - depends_on: - - kafka - - consul - volumes: - - ./ssl/:/etc/ves-hv/ - - xnf-simulator: - image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator -# build: -# context: hv-collector-xnf-simulator -# dockerfile: Dockerfile - ports: - - "6062:6062/tcp" - command: ["--listen-port", "6062", - "--ves-host", "ves-hv-collector", - "--ves-port", "6061", - "--key-store-password", "onaponap", - "--trust-store-password", "onaponap"] - depends_on: - - ves-hv-collector - volumes: - - ./ssl/:/etc/ves-hv/ - - dcae-app-simulator: - image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator -# build: -# context: hv-collector-dcae-app-simulator -# dockerfile: Dockerfile - ports: - - "6063:6063/tcp" - command: ["--listen-port", "6063", - "--kafka-bootstrap-servers", "kafka:9092", - "--kafka-topics", "HV_VES_PERF3GPP"] - depends_on: - - kafka diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index dd0111bc..b686b250 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import reactor.core.publisher.Flux @@ -35,12 +36,12 @@ interface Metrics { @FunctionalInterface interface SinkProvider { - operator fun invoke(config: CollectorConfiguration): Sink + operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink companion object { fun just(sink: Sink): SinkProvider = object : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink = sink + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink } } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 3c85a9b1..5584d61d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -23,15 +23,17 @@ import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import java.util.* interface Collector { - fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> + fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> } -typealias CollectorProvider = () -> Option<Collector> +typealias CollectorProvider = (ClientContext) -> Option<Collector> interface Server { fun start(): IO<ServerHandle> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 5c96e1c5..2008fc35 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,12 +25,13 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -47,31 +48,29 @@ class CollectorFactory(val configuration: ConfigurationProvider, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val collector: AtomicReference<Collector> = AtomicReference() + val config: AtomicReference<CollectorConfiguration> = AtomicReference() configuration() - .map(this::createVesHvCollector) .doOnNext { - logger.info("Using updated configuration for new connections") + logger.info { "Using updated configuration for new connections" } healthState.changeState(HealthDescription.HEALTHY) } .doOnError { - logger.error("Failed to acquire configuration from consul") + logger.error { "Failed to acquire configuration from consul" } healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } - .subscribe(collector::set) - return collector::getOption + .subscribe(config::set) + return { ctx: ClientContext -> + config.getOption().map { config -> createVesHvCollector(config, ctx) } + } } - private fun createVesHvCollector(config: CollectorConfiguration): Collector { - return VesHvCollector( - wireChunkDecoderSupplier = { alloc -> - WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc) - }, - protobufDecoder = VesDecoder(), - router = Router(config.routing), - sink = sinkProvider(config), - metrics = metrics) - } + private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector( + clientContext = ctx, + wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), + protobufDecoder = VesDecoder(), + router = Router(config.routing, ctx), + sink = sinkProvider(config, ctx), + metrics = metrics) companion object { private val logger = Logger(CollectorFactory::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index cee658b6..6105b585 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -20,11 +20,22 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.Routing import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger -class Router(private val routing: Routing) { +class Router(private val routing: Routing, private val ctx: ClientContext) { fun findDestination(message: VesMessage): Option<RoutedMessage> = - routing.routeFor(message.header).map { it(message) } + routing.routeFor(message.header).map { it(message) }.also { + if (it.isEmpty()) { + logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } + } + } + + companion object { + private val logger = Logger(Routing::class) + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 4176de99..cf73aed8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -21,19 +21,19 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Either import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog -import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -42,28 +42,27 @@ import reactor.core.publisher.Mono * @since May 2018 */ internal class VesHvCollector( - private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder, + private val clientContext: ClientContext, + private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, private val router: Router, private val sink: Sink, private val metrics: Metrics) : Collector { - override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = - wireChunkDecoderSupplier(alloc).let { wireDecoder -> - dataStream - .transform { decodeWireFrame(it, wireDecoder) } - .transform(::filterInvalidWireFrame) - .transform(::decodeProtobufPayload) - .transform(::filterInvalidProtobufMessages) - .transform(::routeMessage) - .onErrorResume { logger.handleReactiveStreamError(it) } - .doFinally { releaseBuffersMemory(wireDecoder) } - .then() - } + override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = + dataStream + .transform { decodeWireFrame(it) } + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) + .transform(::routeMessage) + .onErrorResume { logger.handleReactiveStreamError(clientContext, it) } + .doFinally { releaseBuffersMemory() } + .then() - private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux + private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } - .concatMap(decoder::decode) + .concatMap(wireChunkDecoder::decode) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux @@ -75,7 +74,7 @@ internal class VesHvCollector( private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder .decode(rawPayload) - .filterFailedWithLog(logger, + .filterFailedWithLog(logger, clientContext::asMap, { "Ves event header decoded successfully" }, { "Failed to decode ves event header, reason: ${it.message}" }) @@ -89,15 +88,15 @@ internal class VesHvCollector( private fun findRoute(msg: VesMessage) = router .findDestination(msg) - .filterEmptyWithLog(logger, + .filterEmptyWithLog(logger, clientContext::asMap, { "Found route for message: ${it.topic}, partition: ${it.partition}" }, { "Could not find route for message" }) - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - .also { logger.debug("Released buffer memory after handling message stream") } + private fun releaseBuffersMemory() = wireChunkDecoder.release() + .also { logger.debug { "Released buffer memory after handling message stream" } } fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = - filterFailedWithLog(logger, predicate) + filterFailedWithLog(logger, clientContext::asMap, predicate) companion object { private val logger = Logger(VesHvCollector::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt new file mode 100644 index 00000000..21b79bbe --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt @@ -0,0 +1,47 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError +import reactor.core.publisher.Flux + +@Suppress("TooManyFunctions") +internal object ClientContextLogging { + fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block) + fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block) + fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block) + fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block) + fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block) + + fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message) + fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message) + fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message) + fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message) + fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message) + + fun <T> Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable, + returnFlux: Flux<T> = Flux.empty()): Flux<T> { + return this.handleReactiveStreamError({ context.asMap() }, ex, returnFlux) + } +} + diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index cea8a7ee..bbaa47c4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -52,7 +52,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) private val retry = retrySpec .doOnRetry { - logger.warn("Could not get fresh configuration", it.exception()) + logger.withWarn { log("Could not get fresh configuration", it.exception()) } healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index bdce6f73..3fefc6e8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters import io.netty.handler.codec.http.HttpStatusClass +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.netty.http.client.HttpClient @@ -30,8 +31,6 @@ import reactor.netty.http.client.HttpClient */ open class HttpAdapter(private val httpClient: HttpClient) { - private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) - open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient .get() .uri(url + createQueryString(queryParams)) @@ -44,8 +43,8 @@ open class HttpAdapter(private val httpClient: HttpClient) { } } .doOnError { - logger.error("Failed to get resource on path: $url (${it.localizedMessage})") - logger.debug("Nested exception:", it) + logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" } + logger.withDebug { log("Nested exception:", it) } } private fun createQueryString(params: Map<String, Any>): String { @@ -65,4 +64,9 @@ open class HttpAdapter(private val httpClient: HttpClient) { return builder.removeSuffix("&").toString() } + companion object { + + + private val logger = Logger(HttpAdapter::class) + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index 5f4bf354..ec8593af 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -33,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong */ internal class LoggingSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { return object : Sink { private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() @@ -47,9 +50,9 @@ internal class LoggingSinkProvider : SinkProvider { val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } if (msgs % INFO_LOGGING_FREQ == 0L) - logger.info(logMessageSupplier) + logger.info(ctx, logMessageSupplier) else - logger.trace(logMessageSupplier) + logger.trace(ctx, logMessageSupplier) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index c4d6c87e..690a7d1e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -20,6 +20,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn +import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -35,7 +38,8 @@ import java.util.concurrent.atomic.AtomicLong * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink { +internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, + private val ctx: ClientContext) : Sink { private val sentMessages = AtomicLong(0) override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { @@ -45,17 +49,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM if (it.isSuccessful()) { Mono.just(it) } else { - logger.warn(it.exception()) { "Failed to send message to Kafka" } + logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) } Mono.empty<SenderResult<RoutedMessage>>() } } .map { it.correlationMetadata() } - return if (logger.traceEnabled) { - result.doOnNext(::logSentMessage) - } else { - result - } + return result.doOnNext(::logSentMessage) } private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> { @@ -69,7 +69,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM } private fun logSentMessage(sentMsg: RoutedMessage) { - logger.trace { + logger.trace(ctx::asMap, Marker.INVOKE) { val msgNum = sentMessages.incrementAndGet() "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 18191952..b4f470d4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.apache.kafka.clients.producer.ProducerConfig import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader @@ -33,8 +34,8 @@ import reactor.kafka.sender.SenderOptions * @since June 2018 */ internal class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - return KafkaSink(KafkaSender.create(constructSenderOptions(config))) + override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { + return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx) } private fun constructSenderOptions(config: CollectorConfiguration) = diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 0b2997fa..6f02d43e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -23,6 +23,11 @@ import arrow.core.getOrElse import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn +import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle @@ -57,57 +62,64 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, sslContextFactory .createSslContext(serverConfig.securityConfiguration) .map { sslContext -> - logger.info("Collector configured with SSL enabled") + logger.info { "Collector configured with SSL enabled" } this.secure { b -> b.sslContext(sslContext) } }.getOrElse { - logger.info("Collector configured with SSL disabled") + logger.info { "Collector configured with SSL disabled" } this } - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = - collectorProvider().fold( - { - nettyInbound.withConnection { conn -> - logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." } - } - Mono.empty() - }, - { - nettyInbound.withConnection { conn -> - logger.info { "Handling connection from ${conn.address()}" } - conn.configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() - } - it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound)) + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { + val clientContext = ClientContext(nettyOutbound.alloc()) + nettyInbound.withConnection { + clientContext.clientAddress = it.address() + } + + logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" } + return collectorProvider(clientContext).fold( + { + logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." } + Mono.empty() + }, + { + logger.info(clientContext::asMap) { "Handling new connection" } + nettyInbound.withConnection { conn -> + conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .logConnectionClosed(clientContext) } - ) + it.handleConnection(createDataStream(nettyInbound)) + } + ) + } private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound .receive() .retain() - private fun Connection.configureIdleTimeout(timeout: Duration): Connection { + private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection { onReadIdle(timeout.toMillis()) { - logger.info { + logger.info(ctx) { "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." } - disconnectClient() + disconnectClient(ctx) } return this } - private fun Connection.disconnectClient() { + private fun Connection.disconnectClient(ctx: ClientContext) { channel().close().addListener { + logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." } if (it.isSuccess) - logger.debug { "Channel (${address()}) closed successfully." } + logger.debug(ctx) { "Channel closed successfully." } else - logger.warn("Channel close failed", it.cause()) + logger.withWarn(ctx) { log("Channel close failed", it.cause()) } } } - private fun Connection.logConnectionClosed(): Connection { + private fun Connection.logConnectionClosed(ctx: ClientContext): Connection { onTerminate().subscribe { - logger.info("Connection from ${address()} has been closed") + // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled) + logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" } } return this } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 4a2ef6b2..b735138d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -21,15 +21,17 @@ package org.onap.dcae.collectors.veshv.impl.wire import arrow.effects.IO import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace import reactor.core.publisher.Flux +import reactor.core.publisher.Flux.defer import reactor.core.publisher.SynchronousSink /** @@ -38,14 +40,14 @@ import reactor.core.publisher.SynchronousSink */ internal class WireChunkDecoder( private val decoder: WireFrameDecoder, - alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { - private val streamBuffer = alloc.compositeBuffer() + private val ctx: ClientContext) { + private val streamBuffer = ctx.alloc.compositeBuffer() fun release() { streamBuffer.release() } - fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer { + fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = defer { logIncomingMessage(byteBuf) if (byteBuf.readableBytes() == 0) { byteBuf.release() @@ -53,7 +55,7 @@ internal class WireChunkDecoder( } else { streamBuffer.addComponent(true, byteBuf) generateFrames() - .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) } + .onErrorResume { logger.handleReactiveStreamError(ctx, it, Flux.error(it)) } .doFinally { streamBuffer.discardReadComponents() } } } @@ -84,15 +86,15 @@ internal class WireChunkDecoder( } private fun logIncomingMessage(wire: ByteBuf) { - logger.trace { "Got message with total size of ${wire.readableBytes()} B" } + logger.trace(ctx) { "Got message with total size of ${wire.readableBytes()} B" } } private fun logDecodedWireMessage(wire: WireFrameMessage) { - logger.trace { "Wire payload size: ${wire.payloadSize} B" } + logger.trace(ctx) { "Wire payload size: ${wire.payloadSize} B" } } private fun logEndOfData() { - logger.trace { "End of data in current TCP buffer" } + logger.trace(ctx) { "End of data in current TCP buffer" } } companion object { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt new file mode 100644 index 00000000..305e4cb1 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt @@ -0,0 +1,43 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.net.InetSocketAddress +import java.util.* + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +data class ClientContext( + val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT, + val clientId: String = UUID.randomUUID().toString(), + var clientAddress: InetSocketAddress? = null) { + fun asMap(): Map<String, String> { + val result = mutableMapOf("clientId" to clientId) + if (clientAddress != null) { + result["clientAddress"] = clientAddress.toString() + } + return result + } +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index 437614ac..ad97a3f7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -26,15 +26,7 @@ import org.onap.ves.VesEventOuterClass.CommonEventHeader data class Routing(val routes: List<Route>) { fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - Option.fromNullable(routes.find { it.applies(commonHeader) }).also { - if (it.isEmpty()) { - logger.debug { "No route is defined for domain: ${commonHeader.domain}" } - } - } - - companion object { - private val logger = Logger(Routing::class) - } + Option.fromNullable(routes.find { it.applies(commonHeader) }) } data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index e8a31231..e4190163 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None import arrow.core.Some +import io.netty.buffer.ByteBufAllocator import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given @@ -30,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing @@ -56,7 +58,7 @@ object RouterTest : Spek({ withFixedPartitioning() } }.build() - val cut = Router(config) + val cut = Router(config, ClientContext()) on("message with existing route (rtpm)") { val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index f06a0dc7..e0092cf9 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -30,6 +30,7 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.model.ClientContext import reactor.test.test /** @@ -45,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({ fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame)) - fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc) + fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), ClientContext(alloc)) fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { for (bb in byteBuffers) { diff --git a/sources/hv-collector-core/src/test/resources/logback-test.xml b/sources/hv-collector-core/src/test/resources/logback-test.xml index 9a4eacfe..f4cb6c59 100644 --- a/sources/hv-collector-core/src/test/resources/logback-test.xml +++ b/sources/hv-collector-core/src/test/resources/logback-test.xml @@ -1,35 +1,43 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= +--> <configuration> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + <property name="p_tim" value="%date{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC}"/> + <property name="p_lvl" value="%highlight(%-5level)"/> + <property name="p_log" value="%50.50logger"/>WireFrameCodecsTest + <property name="SIMPLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg%n"/> - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern> - %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n - </pattern> - </encoder> - </appender> - - <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> - </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> - <maxFileSize>50MB</maxFileSize> - <maxHistory>30</maxHistory> - <totalSizeCap>10GB</totalSizeCap> - </rollingPolicy> + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${SIMPLE_LOG_PATTERN}</pattern> + </encoder> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> + <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> - <root level="INFO"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> + <root level="INFO"> + <appender-ref ref="CONSOLE"/> </root> </configuration> diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 0897e910..ef4ce967 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -68,7 +68,7 @@ object PerformanceSpecification : Spek({ ) val fluxes = (1.rangeTo(runs)).map { - sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params)) + sut.collector.handleConnection(generateDataStream(sut.alloc, params)) } val durationMs = measureTimeMillis { Flux.merge(fluxes).then().block(timeout) @@ -76,8 +76,8 @@ object PerformanceSpecification : Spek({ val durationSec = durationMs / 1000.0 val throughput = sink.count / durationSec - logger.info("Processed $runs connections each containing $numMessages msgs.") - logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") + logger.info { "Processed $runs connections each containing $numMessages msgs." } + logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" } assertThat(sink.count) .describedAs("should send all events") .isEqualTo(runs * numMessages) @@ -99,11 +99,11 @@ object PerformanceSpecification : Spek({ val dataStream = generateDataStream(sut.alloc, params) .transform(::dropWhenIndex.partially1 { it % 101 == 0L }) - sut.collector.handleConnection(sut.alloc, dataStream) + sut.collector.handleConnection(dataStream) .timeout(timeout) .block() - logger.info("Forwarded ${sink.count} msgs") + logger.info { "Forwarded ${sink.count} msgs" } assertThat(sink.count) .describedAs("should send up to number of events") .isLessThan(numMessages) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 0495ced5..ce242e0b 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState @@ -54,7 +55,7 @@ class Sut(sink: Sink = StoringSink()) { private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector - get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") } + get() = collectorProvider(ClientContext(alloc)).getOrElse{ throw IllegalStateException("Collector not available.") } companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 @@ -63,6 +64,6 @@ class Sut(sink: Sink = StoringSink()) { } fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10)) return sink.sentMessages } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 2d81c671..ab59cc2e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -287,7 +287,7 @@ object VesHvSpecification : Spek({ .map { vesWireFrameMessage(PERF3GPP) } - sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) + sut.collector.handleConnection(incomingMessages).block(defaultTimeout) val messages = sink.sentMessages val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } diff --git a/sources/hv-collector-ct/src/test/resources/logback-test.xml b/sources/hv-collector-ct/src/test/resources/logback-test.xml index 93f22771..fc80a2f2 100644 --- a/sources/hv-collector-ct/src/test/resources/logback-test.xml +++ b/sources/hv-collector-ct/src/test/resources/logback-test.xml @@ -1,35 +1,52 @@ <?xml version="1.0" encoding="UTF-8"?> <configuration> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern> - %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n - </pattern> - </encoder> - </appender> + + <property name="p_tim" value="%date{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC}"/> + <property name="p_thr" value="%thread"/> + <property name="p_lvl" value="%highlight(%-5level)"/> + <property name="p_log" value="%50.50logger"/> + <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/> + <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="ONAP_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_thr}\t +| ${p_lvl}\t +| ${p_log}\t +| ${p_mdc}\t +| ${p_msg}\t +| ${p_exc}\t +| ${p_mak}%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${ONAP_LOG_PATTERN}</pattern> + </encoder> + </appender> <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> - </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> - <maxFileSize>50MB</maxFileSize> - <maxHistory>30</maxHistory> - <totalSizeCap>10GB</totalSizeCap> - </rollingPolicy> + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${ONAP_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> - <root level="INFO"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> </root> </configuration> diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 417183fb..f7d94de5 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -46,7 +46,7 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, throw IllegalArgumentException(message) } - logger.info("Received new configuration. Creating consumer for topics: $topics") + logger.info { "Received new configuration. Creating consumer for topics: $topics" } consumerState.set(consumerFactory.createConsumerForTopics(topics).bind()) }.fix() diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 20c0f592..36f30e66 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -61,13 +61,13 @@ class MessageStreamValidation( return messageParams.fold( { logger.warn { "Error while parsing message parameters: ${it::class.qualifiedName} : ${it.message}" } - logger.debug { "Detailed stack trace: ${it}" } + logger.debug { "Detailed stack trace: $it" } throw IllegalArgumentException("Parsing error: " + it.message) }, { if (it.isEmpty()) { val message = "Message param list cannot be empty" - logger.warn(message) + logger.warn { message } throw IllegalArgumentException(message) } it diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index a6ee1122..e54eb359 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -71,15 +71,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { } .delete("messages") { ctx -> ctx.response.contentType(CONTENT_TEXT) - logger.info("Resetting simulator state") + logger.info { "Resetting simulator state" } ctx.response.sendOrError(simulator.resetState()) } .get("messages/all/count") { ctx -> - logger.info("Processing request for count of received messages") + logger.info { "Processing request for count of received messages" } simulator.state().fold( { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) - logger.warn("Error - number of messages could not be specified") + logger.warn { "Error - number of messages could not be specified" } }, { logger.info { "Returned number of received messages: ${it.messagesCount}" } @@ -90,7 +90,7 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { } .post("messages/all/validate") { ctx -> ctx.request.body.then { body -> - logger.info("Processing request for message validation") + logger.info { "Processing request for message validation" } val response = simulator.validate(body.inputStream) .map { isValid -> if (isValid) { diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 06ff4d59..5856f044 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -43,17 +43,17 @@ fun main(args: Array<String>) = .map(::startApp) .unsafeRunEitherSync( { ex -> - logger.error("Failed to start a server", ex) + logger.withError { log("Failed to start a server", ex) } ExitFailure(1) }, { - logger.info("Started DCAE-APP Simulator API server") + logger.info { "Started DCAE-APP Simulator API server" } } ) private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { - logger.info("Using configuration: $config") + logger.info { "Using configuration: $config" } val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml index 4d12b113..ba07c9c4 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml +++ b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml @@ -1,36 +1,95 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= +--> <configuration> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + <property name="COMPONENT_NAME" + value="hv-ves-dcae-app-simulator"/> + <property name="COMPONENT_SHORT_NAME" + value="dcae-app-simulator"/> - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern> - %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n - </pattern> - </encoder> - </appender> + <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/> + <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/> + <property name="ARCHIVE" value="${LOG_PATH}/archive"/> + + <property name="p_tim" value="%date{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC}"/> + <property name="p_thr" value="%thread"/> + <property name="p_lvl" value="%highlight(%-5level)"/> + <property name="p_log" value="%50.50logger"/> + <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/> + <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="SIMPLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| %rootException%n"/> + <property name="READABLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| ${p_mak}\t +| %rootException\t +| ${p_mdc}\t +| ${p_thr}%n"/> + <property name="ONAP_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_thr}\t +| ${p_lvl}\t +| ${p_log}\t +| ${p_mdc}\t +| ${p_msg}\t +| ${p_exc}\t +| ${p_mak}%n"/> + <property name="LOG_PATTERN_IN_USE" value="${SIMPLE_LOG_PATTERN}"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + </appender> <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> - </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> - <maxFileSize>50MB</maxFileSize> - <maxHistory>30</maxHistory> - <totalSizeCap>10GB</totalSizeCap> - </rollingPolicy> + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + <file>${LOG_PATH}/${LOG_FILENAME}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> - <!--<logger name="reactor.netty" level="DEBUG"/>--> + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + <!--<logger name="reactor.netty" level="DEBUG"/>--> - <root level="INFO"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> </root> </configuration>
\ No newline at end of file diff --git a/sources/hv-collector-domain/src/test/resources/logback.xml b/sources/hv-collector-domain/src/test/resources/logback.xml deleted file mode 100644 index 0bf2cb02..00000000 --- a/sources/hv-collector-domain/src/test/resources/logback.xml +++ /dev/null @@ -1,54 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ ============LICENSE_START======================================================= - ~ dcaegen2-collectors-veshv - ~ ================================================================================ - ~ Copyright (C) 2018 NOKIA - ~ ================================================================================ - ~ Licensed under the Apache License, Version 2.0 (the "License"); - ~ you may not use this file except in compliance with the License. - ~ You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - ~ ============LICENSE_END========================================================= - --> -<configuration> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> - - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern> - %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n - </pattern> - </encoder> - </appender> - - <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> - </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> - <maxFileSize>50MB</maxFileSize> - <maxHistory>30</maxHistory> - <totalSizeCap>10GB</totalSizeCap> - </rollingPolicy> - </appender> - - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> - - <root level="INFO"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> - </root> -</configuration>
\ No newline at end of file diff --git a/sources/hv-collector-health-check/pom.xml b/sources/hv-collector-health-check/pom.xml index 3e5c6aa0..86c9efc7 100644 --- a/sources/hv-collector-health-check/pom.xml +++ b/sources/hv-collector-health-check/pom.xml @@ -58,6 +58,12 @@ <artifactId>arrow-effects</artifactId> </dependency> <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-test</artifactId> <scope>test</scope> diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 899f51fb..5c9566c7 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -40,15 +40,15 @@ fun main(args: Array<String>) = .map(::startAndAwaitServers) .unsafeRunEitherSync( { ex -> - logger.error("Failed to start a server", ex) + logger.withError { log("Failed to start a server", ex) } ExitFailure(1) }, - { logger.info("Gentle shutdown") } + { logger.info { "Gentle shutdown" } } ) private fun startAndAwaitServers(config: ServerConfiguration) = IO.monad().binding { - logger.info("Using configuration: $config") + logger.info { "Using configuration: $config" } HealthCheckServer.start(config).bind() VesServer.start(config).bind() .await().bind() diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt index 5c6f1277..13b0bc7b 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt @@ -31,7 +31,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger abstract class ServerStarter { fun start(config: ServerConfiguration): IO<ServerHandle> = startServer(config) - .map { logger.info(serverStartedMessage(it)); it } + .map { logger.info { serverStartedMessage(it) }; it } protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle> protected abstract fun serverStartedMessage(handle: ServerHandle): String diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index bee0dae1..c88b8aa8 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -1,4 +1,23 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= +--> <configuration> <property name="COMPONENT_NAME" value="dcae-hv-ves-collector"/> @@ -8,24 +27,53 @@ <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/> <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/> <property name="ARCHIVE" value="${LOG_PATH}/archive"/> - <property name="FILE_LOG_PATTERN" value=" -%nopexception%50.50logger -| %date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} -| %highlight(%-5level) -| %msg -| %rootException -| %thread%n"/> + + <property name="p_tim" value="%date{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC}"/> + <property name="p_thr" value="%thread"/> + <property name="p_lvl" value="%highlight(%-5level)"/> + <property name="p_log" value="%50.50logger"/> + <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/> + <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="SIMPLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| %rootException%n"/> + <property name="READABLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| ${p_mak}\t +| %rootException\t +| ${p_mdc}\t +| ${p_thr}%n"/> + <property name="ONAP_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_thr}\t +| ${p_lvl}\t +| ${p_log}\t +| ${p_mdc}\t +| ${p_msg}\t +| ${p_exc}\t +| ${p_mak}%n"/> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> + <pattern>${READABLE_LOG_PATTERN}</pattern> </encoder> </appender> <appender name="ROLLING-FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> + <pattern>${ONAP_LOG_PATTERN}</pattern> </encoder> <file>${LOG_PATH}/${LOG_FILENAME}.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> diff --git a/sources/hv-collector-ssl/src/test/resources/logback-test.xml b/sources/hv-collector-ssl/src/test/resources/logback-test.xml index 9a4eacfe..400b1259 100644 --- a/sources/hv-collector-ssl/src/test/resources/logback-test.xml +++ b/sources/hv-collector-ssl/src/test/resources/logback-test.xml @@ -1,35 +1,43 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= +--> <configuration> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + <property name="p_tim" value="%date{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC}"/> + <property name="p_lvl" value="%highlight(%-5level)"/> + <property name="p_log" value="%50.50logger"/> + <property name="SIMPLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg%n"/> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> - <pattern> - %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n - </pattern> + <pattern>${SIMPLE_LOG_PATTERN}</pattern> </encoder> </appender> - <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> - </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> - <maxFileSize>50MB</maxFileSize> - <maxHistory>30</maxHistory> - <totalSizeCap>10GB</totalSizeCap> - </rollingPolicy> - </appender> - <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> <root level="INFO"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> - </root> + <appender-ref ref="CONSOLE"/> + </root> </configuration> diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt index d017b31b..6ca28a56 100644 --- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt @@ -31,7 +31,7 @@ import java.time.Duration private val logger = Logger("org.onap.dcae.collectors.veshv.tests.utils") object Assertions : org.assertj.core.api.Assertions() { - fun <A,B> assertThat(actual: Either<A, B>) = EitherAssert(actual) + fun <A, B> assertThat(actual: Either<A, B>) = EitherAssert(actual) } @@ -42,7 +42,7 @@ fun waitUntilSucceeds(retries: Int, sleepTime: Duration, action: () -> Unit) { while (tryNum <= retries) { tryNum++ try { - logger.debug("Try number $tryNum") + logger.debug { "Try number $tryNum" } action() break } catch (ex: Throwable) { diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt index 5a733f24..a25b2912 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt @@ -51,7 +51,7 @@ fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Resp fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) { response.attempt().unsafeRunSync().fold( { err -> - logger.warn("Error occurred. Sending .", err) + logger.withWarn { log("Error occurred. Sending .", err) } val message = err.message send(errorResponse(message)) }, diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index 033dd5e5..1e5c9c55 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -21,117 +21,214 @@ package org.onap.dcae.collectors.veshv.utils.logging import kotlin.reflect.KClass import org.slf4j.LoggerFactory +import org.slf4j.MDC + +typealias MappedDiagnosticContext = () -> Map<String, String> @Suppress("TooManyFunctions", "SuboptimalLoggerUsage") -class Logger(val logger: org.slf4j.Logger) { +class Logger(logger: org.slf4j.Logger) { constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java)) constructor(name: String) : this(LoggerFactory.getLogger(name)) - // - // TRACE - // + private val errorLogger = if (logger.isErrorEnabled) ErrorLevelLogger(logger) else OffLevelLogger + private val warnLogger = if (logger.isWarnEnabled) WarnLevelLogger(logger) else OffLevelLogger + private val infoLogger = if (logger.isInfoEnabled) InfoLevelLogger(logger) else OffLevelLogger + private val debugLogger = if (logger.isDebugEnabled) DebugLevelLogger(logger) else OffLevelLogger + private val traceLogger = if (logger.isTraceEnabled) TraceLevelLogger(logger) else OffLevelLogger - val traceEnabled: Boolean - get() = logger.isTraceEnabled + // ERROR - fun trace(messageProvider: () -> String) { - if (logger.isTraceEnabled) { - logger.trace(messageProvider()) - } + fun withError(block: AtLevelLogger.() -> Unit) = errorLogger.block() + + fun withError(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + errorLogger.withMdc(mdc, block) + + fun error(message: () -> String) = errorLogger.run { + log(message()) } - // + fun error(mdc: MappedDiagnosticContext, message: () -> String) = + errorLogger.withMdc(mdc) { log(message()) } + + fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + errorLogger.withMdc(mdc) { log(marker, message()) } + + // WARN + + fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block() + + fun withWarn(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + warnLogger.withMdc(mdc, block) + + fun warn(message: () -> String) = warnLogger.run { + log(message()) + } + + fun warn(mdc: MappedDiagnosticContext, message: () -> String) = + warnLogger.withMdc(mdc) { log(message()) } + + fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + warnLogger.withMdc(mdc) { log(marker, message()) } + + // INFO + + fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block() + + fun withInfo(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + infoLogger.withMdc(mdc, block) + + fun info(message: () -> String) = infoLogger.run { + log(message()) + } + + fun info(mdc: MappedDiagnosticContext, message: () -> String) = + infoLogger.withMdc(mdc) { log(message()) } + + fun info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + infoLogger.withMdc(mdc) { log(marker, message()) } + // DEBUG - // - fun debug(message: String) { - logger.debug(message) + fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block() + + fun withDebug(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + debugLogger.withMdc(mdc, block) + + fun debug(message: () -> String) = debugLogger.run { + log(message()) } - fun debug(message: String, t: Throwable) { - logger.debug(message, t) + fun debug(mdc: MappedDiagnosticContext, message: () -> String) = + debugLogger.withMdc(mdc) { log(message()) } + + fun debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + debugLogger.withMdc(mdc) { log(marker, message()) } + + // TRACE + + fun withTrace(block: AtLevelLogger.() -> Unit) = traceLogger.block() + + fun withTrace(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = + traceLogger.withMdc(mdc, block) + + fun trace(message: () -> String) = traceLogger.run { + log(message()) } - fun debug(messageProvider: () -> String) { - if (logger.isDebugEnabled) { - logger.debug(messageProvider()) + fun trace(mdc: MappedDiagnosticContext, message: () -> String) = + traceLogger.withMdc(mdc) { log(message()) } + + fun trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + traceLogger.withMdc(mdc) { log(marker, message()) } + +} + +abstract class AtLevelLogger { + abstract fun log(message: String) + abstract fun log(message: String, t: Throwable) + abstract fun log(marker: Marker, message: String) + open val enabled: Boolean + get() = true + + inline fun withMdc(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) { + if (enabled) { + try { + MDC.setContextMap(mdc()) + block() + } finally { + MDC.clear() + } } } +} - fun debug(t: Throwable, messageProvider: () -> String) { - if (logger.isDebugEnabled) { - logger.debug(messageProvider(), t) - } +object OffLevelLogger : AtLevelLogger() { + override val enabled = false + + override fun log(message: String) { + // do not log anything } - // - // INFO - // - fun info(message: String) { - logger.info(message) + override fun log(message: String, t: Throwable) { + // do not log anything } - fun info(messageProvider: () -> String) { - if (logger.isInfoEnabled) { - logger.info(messageProvider()) - } + override fun log(marker: Marker, message: String) { + // do not log anything } +} - fun info(message: String, t: Throwable) { - logger.info(message, t) +@Suppress("SuboptimalLoggerUsage") +class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.error(message) } - fun info(t: Throwable, messageProvider: () -> String) { - if (logger.isInfoEnabled) { - logger.info(messageProvider(), t) - } + override fun log(message: String, t: Throwable) { + logger.error(message, t) } - // - // WARN - // + override fun log(marker: Marker, message: String) { + logger.error(marker(), message) + } +} - fun warn(message: String) { +@Suppress("SuboptimalLoggerUsage") +class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { logger.warn(message) } - fun warn(message: String, t: Throwable) { + override fun log(message: String, t: Throwable) { logger.warn(message, t) } - fun warn(messageProvider: () -> String) { - if (logger.isWarnEnabled) { - logger.warn(messageProvider()) - } + override fun log(marker: Marker, message: String) { + logger.warn(marker(), message) } +} - fun warn(t: Throwable, messageProvider: () -> String) { - if (logger.isWarnEnabled) { - logger.warn(messageProvider(), t) - } +@Suppress("SuboptimalLoggerUsage") +class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.info(message) } - // - // ERROR - // + override fun log(message: String, t: Throwable) { + logger.info(message, t) + } - fun error(message: String) { - logger.error(message) + override fun log(marker: Marker, message: String) { + logger.info(marker(), message) } +} - fun error(message: String, t: Throwable) { - logger.error(message, t) +@Suppress("SuboptimalLoggerUsage") +class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.debug(message) } - fun error(messageProvider: () -> String) { - if (logger.isErrorEnabled) { - logger.error(messageProvider()) - } + override fun log(message: String, t: Throwable) { + logger.debug(message, t) } - fun error(t: Throwable, messageProvider: () -> String) { - if (logger.isErrorEnabled) { - logger.error(messageProvider(), t) - } + override fun log(marker: Marker, message: String) { + logger.debug(marker(), message) + } +} + +@Suppress("SuboptimalLoggerUsage") +class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { + override fun log(message: String) { + logger.trace(message) + } + + override fun log(message: String, t: Throwable) { + logger.trace(message, t) + } + + override fun log(marker: Marker, message: String) { + logger.trace(marker(), message) } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt new file mode 100644 index 00000000..83fb9a5e --- /dev/null +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.logging + +import org.slf4j.MarkerFactory + +enum class Marker(private val marker: org.slf4j.Marker) { + ENTRY(MarkerFactory.getMarker("ENTRY")), + EXIT(MarkerFactory.getMarker("EXIT")), + INVOKE(MarkerFactory.getMarker("INVOKE")); + + operator fun invoke() = marker +} diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index e8ec2549..95590d9d 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -25,42 +25,48 @@ import arrow.core.Try import reactor.core.publisher.Flux import reactor.core.publisher.Mono -fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> { - logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})") - logger.debug("Detailed stack trace", ex) +fun <T> Logger.handleReactiveStreamError( + context: MappedDiagnosticContext, + ex: Throwable, + returnFlux: Flux<T> = Flux.empty()): Flux<T> { + warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" } + withDebug(context) { log("Detailed stack trace", ex) } return returnFlux } - fun <T> Try<T>.filterFailedWithLog(logger: Logger, + context: MappedDiagnosticContext, acceptedMsg: (T) -> String, rejectedMsg: (Throwable) -> String): Flux<T> = - fold({ - logger.warn(rejectedMsg(it)) + fold({ ex -> + logger.withWarn(context) { log(rejectedMsg(ex)) } Flux.empty<T>() - }, { - logger.trace { acceptedMsg(it) } - Flux.just(it) + }, { obj -> + logger.trace(context) { acceptedMsg(obj) } + Flux.just(obj) }) fun <T> Option<T>.filterEmptyWithLog(logger: Logger, + context: MappedDiagnosticContext, acceptedMsg: (T) -> String, rejectedMsg: () -> String): Flux<T> = fold({ - logger.warn(rejectedMsg) + logger.warn(context, rejectedMsg) Flux.empty<T>() }, { - logger.trace { acceptedMsg(it) } + logger.trace(context) { acceptedMsg(it) } Flux.just(it) }) -fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) = +fun <T> Flux<T>.filterFailedWithLog(logger: Logger, + context: MappedDiagnosticContext, + predicate: (T) -> Either<() -> String, () -> String>) = flatMap { t -> predicate(t).fold({ - logger.warn(it) + logger.warn(context, it) Mono.empty<T>() }, { - logger.trace(it) + logger.trace(context, it) Mono.just<T>(t) }) } diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt index c27fb8c8..10fc8d8f 100644 --- a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt +++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt @@ -34,11 +34,16 @@ import org.jetbrains.spek.api.dsl.it object LoggerTest : Spek({ lateinit var slf4jLogger: org.slf4j.Logger - lateinit var cut: Logger + fun cut() = Logger(slf4jLogger).also { + verify(slf4jLogger).isTraceEnabled + verify(slf4jLogger).isDebugEnabled + verify(slf4jLogger).isInfoEnabled + verify(slf4jLogger).isWarnEnabled + verify(slf4jLogger).isErrorEnabled + } beforeEachTest { slf4jLogger = mock() - cut = Logger(slf4jLogger) } afterEachTest { @@ -50,28 +55,19 @@ object LoggerTest : Spek({ val exception = Exception("fail") describe("debug levels") { - it("should log message") { - cut.debug(message) - verify(slf4jLogger).debug(message) - } - - it("should log message with exception") { - cut.debug(message, exception) - verify(slf4jLogger).debug(message, exception) - } describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isDebugEnabled).thenReturn(true) - cut.debug { message } + cut().debug { message } verify(slf4jLogger).isDebugEnabled verify(slf4jLogger).debug(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isDebugEnabled).thenReturn(false) - cut.debug { message } + cut().debug { message } verify(slf4jLogger).isDebugEnabled } } @@ -80,42 +76,33 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isDebugEnabled).thenReturn(true) - cut.debug(exception) { message } + cut().withDebug { log(message, exception) } verify(slf4jLogger).isDebugEnabled verify(slf4jLogger).debug(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isDebugEnabled).thenReturn(false) - cut.debug(exception) { message } + cut().withDebug { log(message, exception) } verify(slf4jLogger).isDebugEnabled } } } describe("info levels") { - it("should log message") { - cut.info(message) - verify(slf4jLogger).info(message) - } - - it("should log message with exception") { - cut.info(message, exception) - verify(slf4jLogger).info(message, exception) - } describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isInfoEnabled).thenReturn(true) - cut.info { message } + cut().info { message } verify(slf4jLogger).isInfoEnabled verify(slf4jLogger).info(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isInfoEnabled).thenReturn(false) - cut.info { message } + cut().info { message } verify(slf4jLogger).isInfoEnabled } } @@ -124,42 +111,32 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isInfoEnabled).thenReturn(true) - cut.info(exception) { message } + cut().withInfo { log(message, exception) } verify(slf4jLogger).isInfoEnabled verify(slf4jLogger).info(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isInfoEnabled).thenReturn(false) - cut.info(exception) { message } + cut().withInfo { log(message, exception) } verify(slf4jLogger).isInfoEnabled } } } describe("warning levels") { - it("should log message") { - cut.warn(message) - verify(slf4jLogger).warn(message) - } - - it("should log message with exception") { - cut.warn(message, exception) - verify(slf4jLogger).warn(message, exception) - } - describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isWarnEnabled).thenReturn(true) - cut.warn { message } + cut().warn { message } verify(slf4jLogger).isWarnEnabled verify(slf4jLogger).warn(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isWarnEnabled).thenReturn(false) - cut.warn { message } + cut().warn { message } verify(slf4jLogger).isWarnEnabled } } @@ -168,42 +145,33 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isWarnEnabled).thenReturn(true) - cut.warn(exception) { message } + cut().withWarn { log(message, exception) } verify(slf4jLogger).isWarnEnabled verify(slf4jLogger).warn(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isWarnEnabled).thenReturn(false) - cut.warn(exception) { message } + cut().withWarn { log(message, exception) } verify(slf4jLogger).isWarnEnabled } } } describe("error levels") { - it("should log message") { - cut.error(message) - verify(slf4jLogger).error(message) - } - - it("should log message with exception") { - cut.error(message, exception) - verify(slf4jLogger).error(message, exception) - } describe("lazy logging message") { it("should log when debug is ON") { whenever(slf4jLogger.isErrorEnabled).thenReturn(true) - cut.error { message } + cut().error { message } verify(slf4jLogger).isErrorEnabled verify(slf4jLogger).error(message) } it("should not log when debug is OFF") { whenever(slf4jLogger.isErrorEnabled).thenReturn(false) - cut.error { message } + cut().error { message } verify(slf4jLogger).isErrorEnabled } } @@ -212,14 +180,14 @@ object LoggerTest : Spek({ it("should log when debug is ON") { whenever(slf4jLogger.isErrorEnabled).thenReturn(true) - cut.error(exception) { message } + cut().withError { log(message, exception) } verify(slf4jLogger).isErrorEnabled verify(slf4jLogger).error(message, exception) } it("should not log when debug is OFF") { whenever(slf4jLogger.isErrorEnabled).thenReturn(false) - cut.error(exception) { message } + cut().withError { log(message, exception) } verify(slf4jLogger).isErrorEnabled } } diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt index 0f359df3..da956bec 100644 --- a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt +++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt @@ -42,7 +42,7 @@ class ReactiveLoggingTest : Spek({ val cut = Try.just(event) it("should not filter stream event and log accepted message") { - cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) + cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) .test() .expectNext(event) .verifyComplete() @@ -53,7 +53,7 @@ class ReactiveLoggingTest : Spek({ val e = Exception() val cut = Failure(e) it("should filter stream event and log rejected message") { - cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) + cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) .test() .verifyComplete() } @@ -65,7 +65,7 @@ class ReactiveLoggingTest : Spek({ val cut = Option.just(event) it("should not filter stream event and log accepted message") { - cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE) + cut.filterEmptyWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE) .test() .expectNext(event) .verifyComplete() @@ -75,7 +75,7 @@ class ReactiveLoggingTest : Spek({ given("empty Option") { val cut = Option.empty<Int>() it("should filter stream event and log rejected message") { - cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE) + cut.filterEmptyWithLog(logger,::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE) .test() .verifyComplete() } @@ -88,7 +88,7 @@ class ReactiveLoggingTest : Spek({ val cut = Flux.just(event) it("should not filter stream event and log accepted message") { - cut.filterFailedWithLog(logger, right()) + cut.filterFailedWithLog(logger,::emptyMap, right()) .test() .expectNext(event) .verifyComplete() @@ -99,7 +99,7 @@ class ReactiveLoggingTest : Spek({ val cut = Flux.just(event) it("should filter stream event and log rejected message") { - cut.filterFailedWithLog(logger, left()) + cut.filterFailedWithLog(logger,::emptyMap, left()) .test() .verifyComplete() } diff --git a/sources/hv-collector-utils/src/test/resources/logback-test.xml b/sources/hv-collector-utils/src/test/resources/logback-test.xml index 9a4eacfe..400b1259 100644 --- a/sources/hv-collector-utils/src/test/resources/logback-test.xml +++ b/sources/hv-collector-utils/src/test/resources/logback-test.xml @@ -1,35 +1,43 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= +--> <configuration> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + <property name="p_tim" value="%date{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC}"/> + <property name="p_lvl" value="%highlight(%-5level)"/> + <property name="p_log" value="%50.50logger"/> + <property name="SIMPLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg%n"/> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> - <pattern> - %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n - </pattern> + <pattern>${SIMPLE_LOG_PATTERN}</pattern> </encoder> </appender> - <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> - </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> - <maxFileSize>50MB</maxFileSize> - <maxHistory>30</maxHistory> - <totalSizeCap>10GB</totalSizeCap> - </rollingPolicy> - </appender> - <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> <root level="INFO"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> - </root> + <appender-ref ref="CONSOLE"/> + </root> </configuration> diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index 57aaf3db..ca6d169a 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -61,12 +61,14 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .handle { _, output -> handler(complete, messages, output) } .connect() .doOnError { - logger.info("Failed to connect to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") + logger.info { + "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}" + } } .subscribe { - logger.info("Connected to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") + logger.info { + "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}" + } } return complete.then() } @@ -86,7 +88,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .options { it.flushOnBoundary() } .sendGroups(frames) .then { - logger.info("Messages have been sent") + logger.info { "Messages have been sent" } complete.onComplete() } .then() diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt index 16019384..cfd3a6e9 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -59,17 +59,17 @@ internal class XnfApiServer( .post("simulator/async", ::startSimulationHandler) .get("simulator/:id", ::simulatorStatusHandler) .get("healthcheck") { ctx -> - logger.info("Checking health") + logger.info { "Checking health" } ctx.response.status(HttpConstants.STATUS_OK).send() } } private fun startSimulationHandler(ctx: Context) { - logger.info("Attempting to start asynchronous scenario") + logger.info { "Attempting to start asynchronous scenario" } ctx.request.body.then { body -> val id = startSimulation(body) when (id) { - is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}"} + is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}" } is Either.Right -> logger.info { "Scenario started, details: ${id.b}" } } ctx.response.sendEitherErrorOrResponse(id) @@ -83,7 +83,7 @@ internal class XnfApiServer( } private fun simulatorStatusHandler(ctx: Context) { - logger.debug("Checking task status") + logger.debug { "Checking task status" } val id = UUID.fromString(ctx.pathTokens["id"]) logger.debug { "Checking status for id: $id" } val status = ongoingSimulations.status(id) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt index 21748ae8..d7d42d88 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -43,11 +43,11 @@ class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> result.fold( { err -> - logger.warn("Error", err) + logger.withWarn { log("Error", err) } simulations[id] = StatusFailure(err) }, { - logger.info("Finished sending messages") + logger.info { "Finished sending messages" } simulations[id] = StatusSuccess } ) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 4512dfbf..91070d35 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -42,7 +42,7 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) .map { config -> - logger.info("Using configuration: $config") + logger.info { "Using configuration: $config" } val xnfSimulator = XnfSimulator( VesHvClient(config), MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) @@ -52,10 +52,10 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) } .unsafeRunEitherSync( { ex -> - logger.error("Failed to start a server", ex) + logger.withError { log("Failed to start a server", ex) } ExitFailure(1) }, { - logger.info("Started xNF Simulator API server") + logger.info { "Started xNF Simulator API server" } } ) diff --git a/sources/hv-collector-xnf-simulator/src/main/resources/logback.xml b/sources/hv-collector-xnf-simulator/src/main/resources/logback.xml index 809f62d4..2bc3f978 100644 --- a/sources/hv-collector-xnf-simulator/src/main/resources/logback.xml +++ b/sources/hv-collector-xnf-simulator/src/main/resources/logback.xml @@ -1,35 +1,94 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= +--> <configuration> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + <property name="COMPONENT_NAME" + value="hv-ves-xnf-simulator"/> + <property name="COMPONENT_SHORT_NAME" + value="xnf-simulatr"/> - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern> - %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n - </pattern> - </encoder> - </appender> + <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/> + <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/> + <property name="ARCHIVE" value="${LOG_PATH}/archive"/> + + <property name="p_tim" value="%date{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC}"/> + <property name="p_thr" value="%thread"/> + <property name="p_lvl" value="%highlight(%-5level)"/> + <property name="p_log" value="%50.50logger"/> + <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/> + <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="SIMPLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| %rootException%n"/> + <property name="READABLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| ${p_mak}\t +| %rootException\t +| ${p_mdc}\t +| ${p_thr}%n"/> + <property name="ONAP_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_thr}\t +| ${p_lvl}\t +| ${p_log}\t +| ${p_mdc}\t +| ${p_msg}\t +| ${p_exc}\t +| ${p_mak}%n"/> + <property name="LOG_PATTERN_IN_USE" value="${SIMPLE_LOG_PATTERN}"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + </appender> <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> - </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> - <maxFileSize>50MB</maxFileSize> - <maxHistory>30</maxHistory> - <totalSizeCap>10GB</totalSizeCap> - </rollingPolicy> + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + <file>${LOG_PATH}/${LOG_FILENAME}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> - <root level="INFO"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> </root> </configuration>
\ No newline at end of file |