diff options
33 files changed, 338 insertions, 139 deletions
diff --git a/development/bin/constants.sh b/development/bin/constants.sh new file mode 100755 index 00000000..f0df9b00 --- /dev/null +++ b/development/bin/constants.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +# ============LICENSE_START======================================================= +# dcaegen2-collectors-veshv +# ================================================================================ +# Copyright (C) 2019 NOKIA +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +DCAE_APP_HOSTNAME=localhost +DCAE_APP_PORT=6064 +DCAE_APP_ADDRESS=${DCAE_APP_HOSTNAME}:${DCAE_APP_PORT}
\ No newline at end of file diff --git a/development/bin/dcae-msgs.sh b/development/bin/dcae-msgs.sh index cb05a8c3..964be14f 100755 --- a/development/bin/dcae-msgs.sh +++ b/development/bin/dcae-msgs.sh @@ -56,9 +56,12 @@ while getopts "$optspec" arg; do done shift $((OPTIND-1)) +DEVELOPMENT_BIN_DIRECTORY=$(realpath $(dirname "$0")) +source ${DEVELOPMENT_BIN_DIRECTORY}/constants.sh + if [ -n "${VERBOSE+x}" ]; then echo "All messages count currently consumed by dcae app simulator: " fi -curl --request GET localhost:6063/messages/all/count +curl --request GET ${DCAE_APP_ADDRESS}/messages/all/count echo diff --git a/development/bin/dcae-reset.sh b/development/bin/dcae-reset.sh index e5b7b056..03baf97a 100755 --- a/development/bin/dcae-reset.sh +++ b/development/bin/dcae-reset.sh @@ -57,9 +57,12 @@ while getopts "$optspec" arg; do done shift $((OPTIND-1)) +DEVELOPMENT_BIN_DIRECTORY=$(realpath $(dirname "$0")) +source ${DEVELOPMENT_BIN_DIRECTORY}/constants.sh + if [ -n "${VERBOSE+x}" ]; then - echo "Requesting DCAE app running on port 6063 to reset messages count" + echo "Requesting DCAE app running on port ${DCAE_APP_PORT} to reset messages count" fi -curl --request DELETE localhost:6063/messages +curl --request DELETE ${DCAE_APP_ADDRESS}/messages echo diff --git a/development/bin/dcae-topic.sh b/development/bin/dcae-topic.sh index 8c176221..aefb1d0b 100755 --- a/development/bin/dcae-topic.sh +++ b/development/bin/dcae-topic.sh @@ -56,11 +56,14 @@ while getopts "$optspec" arg; do done shift $((OPTIND-1)) +DEVELOPMENT_BIN_DIRECTORY=$(realpath $(dirname "$0")) +source ${DEVELOPMENT_BIN_DIRECTORY}/constants.sh + TOPIC=${1:-HV_VES_PERF3GPP} if [ -n "${VERBOSE+x}" ]; then - echo "Requesting DCAE app running on port 6063 to consume messages from topic: ${TOPIC}" + echo "Requesting DCAE app running on ${DCAE_APP_ADDRESS} to consume messages from topic: ${TOPIC}" fi -curl --request PUT localhost:6063/configuration/topics -d ${TOPIC} +curl --request PUT ${DCAE_APP_ADDRESS}/configuration/topics -d ${TOPIC} echo
\ No newline at end of file diff --git a/development/bin/run-xnf-simulator.sh b/development/bin/run-xnf-simulator.sh index d8de0097..e4d8d94a 100755 --- a/development/bin/run-xnf-simulator.sh +++ b/development/bin/run-xnf-simulator.sh @@ -21,7 +21,13 @@ set -euo pipefail usage() { echo "Start xnf-simulator container on given port and inside of given docker-network" - echo "Usage: $0 [-h|--help] [-v|--verbose] <xnf listen port> [<hv ves docker network>]" + echo "Usage: $0 [-h|--help] [-v|--verbose] [--ssl-disable] <xnf listen port> [<hv ves hostname> <hv ves port> <hv ves docker network>]" + echo "" + echo "Optional parameters:" + echo " - ssl-disable : Should xNF simulator be configured without using SSL/TLS connections" + echo "Default values:" + echo " - hv ves hostname: ves-hv-collector" + echo " - hv ves port: 6061" exit 1 } @@ -32,6 +38,8 @@ while getopts "$optspec" arg; do case "${OPTARG}" in verbose) VERBOSE=True ;; + ssl-disable) + SSL_DISABLE=True ;; help) usage ;; *) @@ -53,27 +61,42 @@ shift $((OPTIND-1)) LISTEN_PORT=$1 -if [ $# -gt 1 ]; then - HV_VES_NETWORK=${2} +HV_VES_HOSTNAME=${2:-ves-hv-collector} +HV_VES_PORT=${3:-6061} +if [ $# -gt 3 ]; then + HV_VES_NETWORK=${4} fi PORTS="${LISTEN_PORT}:${LISTEN_PORT}/tcp" HV_VES_REPO_HOME=$(realpath $(dirname "$0"))/.. +if [ -n "${SSL_DISABLE+x}" ]; then + SSL_CONFIGURATION="--ssl-disable" +else + SSL_CONFIGURATION="--key-store-password onaponap --trust-store-password onaponap" +fi + if [ -n "${VERBOSE+x}" ]; then - echo "Starting xnf-simulator with ports configuration: ${PORTS}" + echo "Starting xnf-simulator with " + echo " - ports configuration: ${PORTS}" + echo " - SSL configuration: ${SSL_CONFIGURATION}" echo "Container id:" fi + XNF_CONTAINER_ID=$(docker run -d \ -v ${HV_VES_REPO_HOME}/ssl/:/etc/ves-hv/ \ + --health-cmd='curl -s -f http://localhost:6063/health/ready || exit 1' \ + --health-interval=5s \ + --health-retries=3 \ + --health-start-period='10s' \ -p ${PORTS} \ onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator \ --listen-port ${LISTEN_PORT} \ - --ves-host ves-hv-collector \ - --ves-port 6061 \ - --key-store-password onaponap \ - --trust-store-password onaponap) + --health-check-api-port 6063 \ + --ves-host ${HV_VES_HOSTNAME} \ + --ves-port ${HV_VES_PORT} \ + ${SSL_CONFIGURATION}) echo $XNF_CONTAINER_ID diff --git a/development/bin/start-simulation.sh b/development/bin/start-simulation.sh index beede920..8c63ddbb 100755 --- a/development/bin/start-simulation.sh +++ b/development/bin/start-simulation.sh @@ -23,21 +23,25 @@ set -euo pipefail function usage() { echo "" echo "Send messages to hv-ves from multiple xNF simulators" - echo "Usage: $0 [-h|--help] [-v|--verbose] [--messages-in-batch] [--docker-network] [--xnf-logs-directory]" + echo "Usage: $0 [-h|--help] [-v|--verbose] [--ssl-disable]" + echo " [--messages-in-batch=ARG] [--docker-network=ARG] [--xnf-logs-directory=ARG]" echo " <hv ves hostname> <hv ves port> <simulators amount> <messages batches amount per simulator> <messages sending interval>" echo "" echo " - hv ves hostname : HighVolume VES Collector network hostname" echo " - hv ves port : HighVolume VES Collector network port" echo " - simulators amount : Amount of xNF simulators to be launched" - echo " - messages amount per simulator : Amount of messages to be sent from each xNF simulator to HV-VES" - echo " - messages sending interval : interval in seconds between sending messages from xNFs" + echo " - messages batches amount per simulator : Amount of batches of messages to be sent from each xNF simulator to HV-VES" + echo " - messages sending interval : interval in seconds between sending batches of messages from xNFs" echo "Optional parameters:" + echo " - ssl-disable : Should xNF simulator be configured without using SSL/TLS connections" echo " - messages-in-batch : Amount of messages sent on each request" echo " - docker-network : Docker network to which xNF simulators should be added" echo " - xnf-logs-directory : Path to directory where logs from all xNF simulators should be stored" echo "Example invocations:" echo "./start-simulation.sh --messages-in-batch=5 --docker-network=development_default ves-hv-collector 6061 10 20 0.5" echo "./start-simulation.sh --messages-in-batch=5 --xnf-logs-directory=/tmp/xnf-simulation localhost 6061 10 20 0.5" + echo "Invocation with remote HV-VES host (Kubernetes slave IP given with default K8S NodePort for HV-VES service):" + echo "./start-simulation.sh --ssl-disable --xnf-logs-directory=/tmp/xnf-simulation 10.183.36.78 30222 5 100 5" exit 1 } @@ -56,10 +60,12 @@ function create_logs_dir() { } function create_xNFs_simulators() { + echo "Creating ${XNFS_AMOUNT} xNFs simulators" + [ -n "${SSL_DISABLE+x}" ] && verbose_log "--ssl-disable flag will be set inside containers." for i in $(seq 1 ${XNFS_AMOUNT}); do local XNF_PORT=$(get_unoccupied_port 32000 65000) verbose_log "Starting xNF simulator container on port ${XNF_PORT} using run-xnf-simulator script" - XNF_CONTAINER_ID=$(${DEVELOPMENT_BIN_DIRECTORY}/run-xnf-simulator.sh $XNF_PORT ${DOCKER_NETWORK:-}) + XNF_CONTAINER_ID=$(${DEVELOPMENT_BIN_DIRECTORY}/run-xnf-simulator.sh ${SSL_DISABLE} $XNF_PORT ${HV_VES_HOSTNAME} ${HV_VES_PORT} ${DOCKER_NETWORK:-}) CREATED_XNF_SIMULATORS_PORTS+=(${XNF_PORT}) verbose_log "Container id: ${XNF_CONTAINER_ID}" CREATED_XNF_SIMULATORS_IDS+=(${XNF_CONTAINER_ID}) @@ -80,19 +86,20 @@ function get_unoccupied_port() { } function wait_for_containers_startup_or_fail() { - local seconds_to_wait=10 + local intervals_amount=30 + local wait_interval=5 local all_containers_healthy=1 - verbose_log "Waiting ${seconds_to_wait}s for containers startup" + verbose_log "Waiting up to ${intervals_amount} times with interval of ${wait_interval}s for containers startup" set +e - for i in $(seq 1 ${seconds_to_wait}); do + for i in $(seq 1 ${intervals_amount}); do verbose_log "Try no. ${i}" all_containers_healthy=1 - for port in ${CREATED_XNF_SIMULATORS_PORTS[@]}; do - verbose_log "Checking container on port ${port}" - local status_code=$(curl -s -o /dev/null -I -w "%{http_code}" localhost:${port}/healthcheck) - if [ $status_code -ne 200 ]; then - verbose_log "Container on port ${port} is unhealthy " + for id in ${CREATED_XNF_SIMULATORS_IDS[@]}; do + verbose_log "Checking container with id ${id}" + health=$(docker inspect --format='{{json .State.Health.Status}}' ${id}) + if [ ${health} != "\"healthy\"" ]; then + verbose_log "Container ${id} is not in healthy state. Actual status: ${health}" all_containers_healthy=0 break fi @@ -100,7 +107,8 @@ function wait_for_containers_startup_or_fail() { if [ $all_containers_healthy -eq 1 ]; then break fi - sleep 1 + verbose_log "Sleeping for ${wait_interval}s" + sleep $wait_interval done set -e @@ -113,8 +121,8 @@ function wait_for_containers_startup_or_fail() { } function start_simulation() { - verbose_log "Simulation: every xNF will send ${MESSAGES_IN_BATCH} messages to hv-ves - ${MESSAGE_BATCHES_AMOUNT} times, once every ${MESSAGES_SENDING_INTERVAL}" + verbose_log "Simulation: every xNF will send ${MESSAGES_IN_BATCH} messages to hv-ves ( running on + ${HV_VES_HOSTNAME}:${HV_VES_PORT} ) ${MESSAGE_BATCHES_AMOUNT} times, once every ${MESSAGES_SENDING_INTERVAL}s" for port in ${CREATED_XNF_SIMULATORS_PORTS[@]}; do start_single_simulation $port $MESSAGES_IN_BATCH & done @@ -144,11 +152,12 @@ function wait_for_simulators_to_finish_sending_messages() { for i in $(seq 1 ${seconds_to_wait}); do verbose_log "Wait no. ${i}" all_containers_finished=1 - for port in ${CREATED_XNF_SIMULATORS_PORTS[@]}; do - local container_status=$(curl --request GET -s localhost:${port}/healthcheck | jq -r '.["Detailed status"]') + for id in ${CREATED_XNF_SIMULATORS_IDS[@]}; do + verbose_log "Checking container ${id}" + local container_status=$(docker inspect --format='{{json .State.Health.Log }}' ${id} | jq '.[-1] | .Output') - verbose_log "Container on port ${port} status: ${container_status}" - if [ "${container_status}" = "Busy" ]; then + verbose_log "Container ${id} status: ${container_status}" + if [ "${container_status}" != "\"UP\\nNo simulation is in progress at the moment\"" ]; then all_containers_finished=0 break fi @@ -157,8 +166,18 @@ function wait_for_simulators_to_finish_sending_messages() { echo "All containers finished sending messages" break fi + verbose_log "Sleeping for 1s" sleep 1 done + + + if [ $all_containers_finished -ne 1 ]; then + echo "[ERROR] Some xNFs simulators failed to finish sending messages - simulation probably failed" + echo "For debug output rerun simulation with -v and --xnf-logs-directory command line options" + cleanup + echo "Exitting..." + exit 3 + fi } function cleanup() { @@ -170,7 +189,7 @@ function cleanup() { if [ -n "${XNF_LOGS_DIRECTORY+x}" ]; then local log_file=${XNF_LOGS_DIRECTORY}/${container_id}.log verbose_log "Writing container logs to: ${log_file}" - docker logs ${container_id} > $log_file + docker logs ${container_id} &> $log_file fi verbose_log "Removing container: ${container_id}" docker rm $container_id > /dev/null @@ -202,6 +221,8 @@ while getopts "$optspec" arg; do case "${OPTARG}" in verbose) VERBOSE=True ;; + ssl-disable) + SSL_DISABLE="--ssl-disable" ;; help) usage ;; *) @@ -239,14 +260,14 @@ MESSAGE_BATCHES_AMOUNT=${4} MESSAGES_SENDING_INTERVAL=${5} # set defaults if absent -[ -z "${MESSAGES_IN_BATCH}" ] && MESSAGES_IN_BATCH=1 +[ -z "${MESSAGES_IN_BATCH+x}" ] && MESSAGES_IN_BATCH=1 +[ -z "${SSL_DISABLE+x}" ] && SSL_DISABLE="" create_logs_dir CREATED_XNF_SIMULATORS_PORTS=() CREATED_XNF_SIMULATORS_IDS=() -echo "Creating ${XNFS_AMOUNT} xNFs simulators" trap cleanup SIGINT SIGTERM create_xNFs_simulators @@ -259,6 +280,7 @@ assure_all_xNFs_requests_were_sent assumed_message_sending_time=$(echo ";0.00025 * $XNFS_AMOUNT" | bc) seconds_to_wait=$(echo ";$assumed_message_sending_time * $MESSAGE_BATCHES_AMOUNT * $MESSAGES_IN_BATCH" | bc) +seconds_to_wait=$(echo ";if($seconds_to_wait > 2) $seconds_to_wait else 2" | bc) wait_for_simulators_to_finish_sending_messages $seconds_to_wait # there might be network lag between moment when xNF finished sending messages and they actually are received by hv-ves # thus we cannot start removing xNFs immediately to prevent closing socket channels diff --git a/development/docker-compose.yml b/development/docker-compose.yml index adf8947d..284199d0 100644 --- a/development/docker-compose.yml +++ b/development/docker-compose.yml @@ -67,7 +67,9 @@ services: "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true", "--kafka-bootstrap-servers", "message-router-kafka:9092", "--key-store-password", "onaponap", - "--trust-store-password", "onaponap"] + "--trust-store-password", "onaponap", + "--first-request-delay", "2", + "--log-level", "DEBUG"] environment: JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid" healthcheck: @@ -91,12 +93,20 @@ services: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator ports: - "6062:6062/tcp" + - "6063:6063" command: ["--listen-port", "6062", + "--health-check-api-port", "6063", "--ves-host", "ves-hv-collector", "--ves-port", "6061", "--key-store", "/etc/ves-hv/client.p12", "--key-store-password", "onaponap", "--trust-store-password", "onaponap"] + healthcheck: + test: curl -f http://localhost:6063/health/ready || exit 1 + interval: 10s + timeout: 3s + retries: 3 + start_period: 10s depends_on: - ves-hv-collector volumes: @@ -105,8 +115,8 @@ services: dcae-app-simulator: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator ports: - - "6063:6063/tcp" - command: ["--listen-port", "6063", + - "6064:6064/tcp" + command: ["--listen-port", "6064", "--kafka-bootstrap-servers", "message-router-kafka:9092", "--kafka-topics", "HV_VES_PERF3GPP"] depends_on: diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index fe2b89d5..861065c1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -56,7 +56,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, } .doOnError { logger.error { "Failed to acquire configuration from consul" } - healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) + healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) } .subscribe(config::set) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 87399caf..d58cc792 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -56,7 +56,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf()) private val retry = retrySpec.doOnRetry { logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) } - healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) } constructor(http: HttpAdapter, diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index a92d3763..ccae3c99 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -39,8 +39,6 @@ import reactor.core.publisher.Mono import reactor.retry.Retry import reactor.test.StepVerifier import java.time.Duration -import java.util.* -import kotlin.test.assertEquals /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -108,7 +106,7 @@ internal object ConsulConfigurationProviderTest : Spek({ it("should update the health state") { StepVerifier.create(healthStateProvider().take(iterationCount)) .expectNextCount(iterationCount - 1) - .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) .verifyComplete() } } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 338c3734..75e7cf0e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -314,7 +314,7 @@ object VesHvSpecification : Spek({ it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) .describedAs("application health state") - .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) + .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) } } } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index e54eb359..88e01c23 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger import ratpack.handling.Chain import ratpack.server.RatpackServer import ratpack.server.ServerConfig +import java.net.InetSocketAddress /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -52,11 +53,13 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { } - fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = + fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<RatpackServer> = simulator.listenToTopics(kafkaTopics).map { RatpackServer.start { server -> - server.serverConfig(ServerConfig.embedded().port(port)) - .handlers(::setupHandlers) + server.serverConfig( + ServerConfig.embedded() + .port(socketAddress.port) + ).handlers(::setupHandlers) } } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt index 17eeb5b1..54fd6f44 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -34,6 +34,7 @@ import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.utils.commandline.intValue import org.onap.dcae.collectors.veshv.utils.commandline.stringValue +import java.net.InetSocketAddress class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) { override val cmdLineOptionsList: List<CommandLineOption> = listOf( @@ -59,7 +60,7 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration .bind() DcaeAppSimConfiguration( - listenPort, + InetSocketAddress(listenPort), maxPayloadSizeBytes, kafkaBootstrapServers, kafkaTopics) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt index a6fc8053..2b0382ac 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt @@ -19,8 +19,10 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config +import java.net.InetSocketAddress + data class DcaeAppSimConfiguration( - val apiPort: Int, + val apiAddress: InetSocketAddress, val maxPayloadSizeBytes: Int, val kafkaBootstrapServers: String, val kafkaTopics: Set<String> diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 5856f044..abf60b0d 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -57,6 +57,6 @@ private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) - .start(config.apiPort, config.kafkaTopics) + .start(config.apiAddress, config.kafkaTopics) .unit() } diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt index 7137fe12..055ca19f 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt @@ -54,7 +54,7 @@ internal class ArgDcaeAppSimConfigurationTest : Spek({ } it("should set proper port") { - assertThat(result.apiPort).isEqualTo(listenPort.toInt()) + assertThat(result.apiAddress.port).isEqualTo(listenPort.toInt()) } @@ -79,7 +79,7 @@ internal class ArgDcaeAppSimConfigurationTest : Spek({ } it("should set proper port") { - assertThat(result.apiPort).isEqualTo(listenPort.toInt()) + assertThat(result.apiAddress.port).isEqualTo(listenPort.toInt()) } it("should set proper kafka bootstrap servers") { diff --git a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt index 8c69406c..4758fb6b 100644 --- a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt +++ b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt @@ -25,8 +25,11 @@ package org.onap.dcae.collectors.veshv.healthcheck.api * @since August 2018 */ enum class HealthDescription(val message: String, val status: HealthStatus) { + STARTING("Component is starting", HealthStatus.OUT_OF_SERVICE), HEALTHY("Healthy", HealthStatus.UP), - STARTING("Collector is starting", HealthStatus.OUT_OF_SERVICE), - RETRYING_FOR_CONSUL_CONFIGURATION("Consul configuration not available. Retrying.", HealthStatus.OUT_OF_SERVICE), - CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", HealthStatus.DOWN) + BUSY("Processing at least one request", HealthStatus.UP), + IDLE("No simulation is in progress at the moment", HealthStatus.UP), + /* Configuration related */ + RETRYING_FOR_DYNAMIC_CONFIGURATION("Dynamic configuration not available. Retrying.", HealthStatus.OUT_OF_SERVICE), + DYNAMIC_CONFIGURATION_NOT_FOUND("Dynamic configuration not found", HealthStatus.DOWN) } diff --git a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt index aead71bb..7aade34b 100644 --- a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt +++ b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt @@ -59,7 +59,7 @@ class HealthCheckApiServer(private val healthState: HealthState, private fun readinessHandler(_req: HttpServerRequest, resp: HttpServerResponse) = healthDescription.get().run { - logger.debug { "HV-VES status: $status, $message" } + logger.debug { "Component status: $status, $message" } resp.status(status.httpResponseStatus.number).sendString(Flux.just(status.toString(), "\n", message)) } diff --git a/sources/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt b/sources/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt index e3fced2d..9a0fe2f3 100644 --- a/sources/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt +++ b/sources/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt @@ -35,17 +35,17 @@ object HealthStateProviderImplTest : Spek({ val healthStateProviderImpl = HealthStateImpl() on("health state update") { healthStateProviderImpl.changeState(HealthDescription.HEALTHY) - healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) - healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) - healthStateProviderImpl.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) + healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + healthStateProviderImpl.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) it("should push new health state to the subscriber") { StepVerifier .create(healthStateProviderImpl().take(4)) .expectNext(HealthDescription.HEALTHY) - .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) - .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) - .expectNext(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) + .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + .expectNext(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) .verifyComplete() } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index c29c5d16..16da3721 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.registerShutdownHook private const val VESHV_PACKAGE = "org.onap.dcae.collectors.veshv" private val logger = Logger("$VESHV_PACKAGE.main") @@ -44,7 +45,7 @@ fun main(args: Array<String>) = logger.withError { log("Failed to start a server", ex) } ExitFailure(1) }, - { logger.info { "Gentle shutdown" } } + { logger.info { "Finished" } } ) private fun startAndAwaitServers(config: ServerConfiguration) = @@ -52,7 +53,8 @@ private fun startAndAwaitServers(config: ServerConfiguration) = Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) logger.info { "Using configuration: $config" } HealthCheckServer.start(config).bind() - VesServer.start(config).bind() - .await().bind() + VesServer.start(config).bind().run { + registerShutdownHook(shutdown()).bind() + await().bind() + } }.fix() - diff --git a/sources/hv-collector-main/src/main/scripts/entry.sh b/sources/hv-collector-main/src/main/scripts/entry.sh index 2e8cb0c5..a612e393 100755 --- a/sources/hv-collector-main/src/main/scripts/entry.sh +++ b/sources/hv-collector-main/src/main/scripts/entry.sh @@ -2,4 +2,19 @@ set -euo pipefail -java ${JAVA_OPTS:-''} -cp '*:' org.onap.dcae.collectors.veshv.main.MainKt $@ +pid=-1 + +function handle_sigterm() { + if [[ ${pid} -ge 0 ]]; then + echo "Caught SIGTERM signal. Redirecting to process with pid=${pid}" + kill -TERM "${pid}" + wait ${pid} + fi + exit 143 # 128 + 15 -- SIGTERM +} +trap "handle_sigterm" SIGTERM + +java ${JAVA_OPTS:-} -cp '*:' org.onap.dcae.collectors.veshv.main.MainKt $@ & +pid=$! +echo "Service started with pid=${pid}" +wait ${pid} diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt index 05d13094..3c2c64ac 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt @@ -46,10 +46,9 @@ object ExitSuccess : ExitCode() { data class ExitFailure(override val code: Int) : ExitCode() -fun Either<IO<Unit>, IO<Unit>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) = +fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) = flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() }) - fun IO<Any>.unit() = map { Unit } fun <T> Mono<T>.asIo() = IO.async<T> { callback -> diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index e7aca55d..99ecfd74 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -71,4 +71,4 @@ fun <T> Flux<T>.filterFailedWithLog(logger: Logger, logger.trace(context, it) Mono.just<T>(t) }) - }
\ No newline at end of file + } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index bdb63b68..b8784c64 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -20,7 +20,9 @@ package org.onap.dcae.collectors.veshv.utils import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.netty.DisposableServer +import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -37,10 +39,17 @@ abstract class ServerHandle(val host: String, val port: Int) { */ class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) { override fun shutdown() = IO { - ctx.disposeNow() + logger.info { "Graceful shutdown" } + ctx.disposeNow(SHUTDOWN_TIMEOUT) + logger.info { "Server disposed" } } override fun await() = IO<Unit> { ctx.channel().closeFuture().sync() } + + companion object { + val logger = Logger(NettyServerHandle::class) + private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10) + } } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/status.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt index a86e3d50..2678a8d5 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/status.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,25 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.xnf.impl +package org.onap.dcae.collectors.veshv.utils -// TODO: probably should be merged with HealthDescription or made similiar to it -internal object XnfStatus { +import arrow.effects.IO - const val BUSY = "Busy" - const val IDLE = "Idle" - const val DETAILED_STATUS_NODE = "Detailed status" +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since January 2019 + */ + +fun registerShutdownHook(job: () -> Unit) { + Runtime.getRuntime().addShutdownHook(object : Thread() { + override fun run() { + job() + } + }) +} + +fun registerShutdownHook(job: IO<Unit>) = IO { + registerShutdownHook { + job.unsafeRunSync() + } } diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml index de262567..33197dd1 100644 --- a/sources/hv-collector-xnf-simulator/pom.xml +++ b/sources/hv-collector-xnf-simulator/pom.xml @@ -101,6 +101,11 @@ <version>${project.parent.version}</version> </dependency> <dependency> + <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> + <artifactId>hv-collector-health-check</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-test-utils</artifactId> <version>${project.parent.version}</version> @@ -111,6 +116,10 @@ <artifactId>arrow-effects</artifactId> </dependency> <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects-instances</artifactId> + </dependency> + <dependency> <groupId>org.jetbrains.kotlinx</groupId> <artifactId>kotlinx-coroutines-core</artifactId> </dependency> diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index ca6d169a..8de7da32 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -35,6 +35,7 @@ import reactor.core.publisher.Mono import reactor.core.publisher.ReplayProcessor import reactor.netty.NettyOutbound import reactor.netty.tcp.TcpClient +import reactor.util.concurrent.Queues.XS_BUFFER_SIZE /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -43,8 +44,7 @@ import reactor.netty.tcp.TcpClient class VesHvClient(private val configuration: SimulatorConfiguration) { private val client: TcpClient = TcpClient.create() - .host(configuration.vesHost) - .port(configuration.vesPort) + .addressSupplier { configuration.hvVesAddress } .configureSsl() private fun TcpClient.configureSsl() = @@ -61,14 +61,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .handle { _, output -> handler(complete, messages, output) } .connect() .doOnError { - logger.info { - "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}" - } + logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" } } .subscribe { - logger.info { - "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}" - } + logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" } } return complete.then() } @@ -81,7 +77,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { val encoder = WireFrameEncoder(allocator) val frames = messages .map(encoder::encode) - .window(MAX_BATCH_SIZE) + .window(XS_BUFFER_SIZE) return nettyOutbound .logConnectionClosed() @@ -99,13 +95,12 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { private fun NettyOutbound.logConnectionClosed() = withConnection { conn -> - conn.onTerminate().subscribe { + conn.onDispose { logger.info { "Connection to ${conn.address()} has been closed" } } } companion object { private val logger = Logger(VesHvClient::class) - private const val MAX_BATCH_SIZE = 128 } } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt index a0785620..654f16a6 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -23,10 +23,6 @@ import arrow.core.Either import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.BUSY -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.DETAILED_STATUS_NODE -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.IDLE -import org.onap.dcae.collectors.veshv.utils.http.HttpConstants import org.onap.dcae.collectors.veshv.utils.http.Response import org.onap.dcae.collectors.veshv.utils.http.Responses import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors @@ -38,8 +34,8 @@ import ratpack.handling.Context import ratpack.http.TypedData import ratpack.server.RatpackServer import ratpack.server.ServerConfig +import java.net.InetSocketAddress import java.util.* -import javax.json.Json /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -49,9 +45,10 @@ internal class XnfApiServer( private val xnfSimulator: XnfSimulator, private val ongoingSimulations: OngoingSimulations) { - fun start(port: Int): IO<RatpackServer> = IO { + fun start(socketAddress: InetSocketAddress): IO<RatpackServer> = IO { RatpackServer.start { server -> - server.serverConfig(ServerConfig.embedded().port(port)) + server.serverConfig(ServerConfig.embedded() + .port(socketAddress.port)) .handlers(this::configureHandlers) } } @@ -61,7 +58,6 @@ internal class XnfApiServer( .post("simulator", ::startSimulationHandler) .post("simulator/async", ::startSimulationHandler) .get("simulator/:id", ::simulatorStatusHandler) - .get("healthcheck", ::healthcheckHandler) } private fun startSimulationHandler(ctx: Context) { @@ -93,21 +89,6 @@ internal class XnfApiServer( ctx.response.sendAndHandleErrors(IO.just(response)) } - private fun healthcheckHandler(ctx: Context) { - val healthCheckDetailedMessage = createHealthCheckDetailedMessage() - val simulatorStatus = HttpConstants.STATUS_OK - logger.info { "Returning simulator status: ${simulatorStatus} ${healthCheckDetailedMessage}" } - ctx.response.status(simulatorStatus).send(healthCheckDetailedMessage) - } - - private fun createHealthCheckDetailedMessage() = - Json.createObjectBuilder() - .add(DETAILED_STATUS_NODE, when { - ongoingSimulations.isAnySimulationPending() -> BUSY - else -> IDLE - }) - .build().toString() - companion object { private val logger = Logger(XnfApiServer::class) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt new file mode 100644 index 00000000..5e1c979c --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer +import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono + + +internal class XnfHealthCheckServer { + fun startServer(config: SimulatorConfiguration) = createHealthCheckServer(config) + .start() + .map { logger.info(serverStartedMessage(it)); it } + + private fun createHealthCheckServer(config: SimulatorConfiguration): HealthCheckApiServer { + val monitoring = object : PrometheusMetricsProvider { + override fun lastStatus(): Mono<String> = Mono.just("not implemented") + } + return HealthCheckApiServer( + HealthState.INSTANCE, + monitoring, + config.healthCheckApiListenAddress) + } + + private fun serverStartedMessage(handle: ServerHandle) = + { "Health check server is up and listening on ${handle.host}:${handle.port}" } + + companion object { + private val logger = Logger(XnfHealthCheckServer::class) + } +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt index 0b321362..7885514b 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -28,17 +28,19 @@ import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT import org.onap.dcae.collectors.veshv.utils.commandline.intValue import org.onap.dcae.collectors.veshv.utils.commandline.stringValue +import java.net.InetSocketAddress /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -49,6 +51,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon VES_HV_PORT, VES_HV_HOST, LISTEN_PORT, + HEALTH_CHECK_API_PORT, MAXIMUM_PAYLOAD_SIZE_BYTES, SSL_DISABLE, KEY_STORE_FILE, @@ -61,14 +64,20 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon val listenPort = cmdLine.intValue(LISTEN_PORT).bind() val vesHost = cmdLine.stringValue(VES_HV_HOST).bind() val vesPort = cmdLine.intValue(VES_HV_PORT).bind() + val healthCheckApiListenAddress = cmdLine.intValue(HEALTH_CHECK_API_PORT, + DefaultValues.HEALTH_CHECK_API_PORT) val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) SimulatorConfiguration( - listenPort, - vesHost, - vesPort, + InetSocketAddress(listenPort), + InetSocketAddress(healthCheckApiListenAddress), + InetSocketAddress(vesHost, vesPort), maxPayloadSizeBytes, createSecurityConfiguration(cmdLine).bind()) }.fix() + + internal object DefaultValues { + const val HEALTH_CHECK_API_PORT = 6063 + } } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt index 3395d282..5a0e73c7 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt @@ -20,14 +20,15 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.net.InetSocketAddress /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ data class SimulatorConfiguration( - val listenPort: Int, - val vesHost: String, - val vesPort: Int, + val listenAddress: InetSocketAddress, + val healthCheckApiListenAddress: InetSocketAddress, + val hvVesAddress: InetSocketAddress, val maxPayloadSizeBytes: Int, val security: SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt index bd58dd9c..fb71b2cd 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO import kotlinx.coroutines.asCoroutineDispatcher +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.util.* @@ -32,13 +35,15 @@ import java.util.concurrent.Executors * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { +class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(), + private val healthState: HealthState = HealthState.INSTANCE) { private val asyncSimulationContext = executor.asCoroutineDispatcher() private val simulations = ConcurrentHashMap<UUID, Status>() fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID { val id = UUID.randomUUID() simulations[id] = StatusOngoing + updateHealthState() simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> result.fold( @@ -50,20 +55,22 @@ class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { logger.info { "Finished sending messages" } simulations[id] = StatusSuccess } - ) + ).also { updateHealthState() } } return id } - fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound) + private fun updateHealthState() = healthState.changeState(currentState()) + + private fun currentState() = if (isAnySimulationPending()) BUSY else IDLE - fun isAnySimulationPending() = simulations.any { + internal fun isAnySimulationPending() = simulations.any { status(it.key) is StatusOngoing } - internal fun clear() { - simulations.clear() - } + fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound) + + internal fun clear() = simulations.clear() companion object { private val logger = Logger(XnfApiServer::class) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 91070d35..308c6864 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -19,17 +19,25 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monad.monad +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync -import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import ratpack.server.RatpackServer private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf" private val logger = Logger(PACKAGE_NAME) @@ -41,15 +49,7 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" */ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map { config -> - logger.info { "Using configuration: $config" } - val xnfSimulator = XnfSimulator( - VesHvClient(config), - MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) - XnfApiServer(xnfSimulator, OngoingSimulations()) - .start(config.listenPort) - .unit() - } + .map(::startServers) .unsafeRunEitherSync( { ex -> logger.withError { log("Failed to start a server", ex) } @@ -57,5 +57,18 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) }, { logger.info { "Started xNF Simulator API server" } + HealthState.INSTANCE.changeState(HealthDescription.IDLE) } ) + +private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> = + IO.monad().binding { + logger.info { "Using configuration: $config" } + XnfHealthCheckServer().startServer(config).bind() + val xnfSimulator = XnfSimulator( + VesHvClient(config), + MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + XnfApiServer(xnfSimulator, OngoingSimulations()) + .start(config.listenAddress) + .bind() + }.fix() |