diff options
8 files changed, 390 insertions, 68 deletions
diff --git a/development/bin/run-xnf-simulator.sh b/development/bin/run-xnf-simulator.sh index 3fe96928..d8de0097 100755 --- a/development/bin/run-xnf-simulator.sh +++ b/development/bin/run-xnf-simulator.sh @@ -20,7 +20,7 @@ set -euo pipefail usage() { - echo "Start xnf-simulator container on given port and inside of given docker-network (by default 'development_default')" + echo "Start xnf-simulator container on given port and inside of given docker-network" echo "Usage: $0 [-h|--help] [-v|--verbose] <xnf listen port> [<hv ves docker network>]" exit 1 } @@ -31,27 +31,20 @@ while getopts "$optspec" arg; do -) # handle longopts case "${OPTARG}" in verbose) - VERBOSE=True - ;; + VERBOSE=True ;; help) - usage - ;; + usage ;; *) echo "Unknown option --${OPTARG}" >&2 - usage - ;; - esac - ;; + usage ;; + esac ;; v) - VERBOSE=True - ;; + VERBOSE=True ;; h) - usage - ;; + usage ;; *) echo "Unknown option -${OPTARG}" >&2 - usage - ;; + usage ;; esac done shift $((OPTIND-1)) @@ -60,22 +53,33 @@ shift $((OPTIND-1)) LISTEN_PORT=$1 -HV_VES_NETWORK=${2:-development_default} +if [ $# -gt 1 ]; then + HV_VES_NETWORK=${2} +fi PORTS="${LISTEN_PORT}:${LISTEN_PORT}/tcp" -HV_VES_REPO_HOME=`pwd`/.. +HV_VES_REPO_HOME=$(realpath $(dirname "$0"))/.. if [ -n "${VERBOSE+x}" ]; then - echo "Starting xnf-simulator with ports configuration: ${PORTS} on network: ${HV_VES_NETWORK}" + echo "Starting xnf-simulator with ports configuration: ${PORTS}" echo "Container id:" fi -docker run -d \ + +XNF_CONTAINER_ID=$(docker run -d \ -v ${HV_VES_REPO_HOME}/ssl/:/etc/ves-hv/ \ -p ${PORTS} \ - --network ${HV_VES_NETWORK} \ onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator \ --listen-port ${LISTEN_PORT} \ --ves-host ves-hv-collector \ --ves-port 6061 \ --key-store-password onaponap \ - --trust-store-password onaponap
\ No newline at end of file + --trust-store-password onaponap) + +echo $XNF_CONTAINER_ID + +if [ -n "${HV_VES_NETWORK+x}" ]; then + if [ -n "${VERBOSE+x}" ]; then + echo "Adding container to network: ${HV_VES_NETWORK}" + fi + docker network connect ${HV_VES_NETWORK} ${XNF_CONTAINER_ID} +fi diff --git a/development/bin/start-simulation.sh b/development/bin/start-simulation.sh new file mode 100755 index 00000000..beede920 --- /dev/null +++ b/development/bin/start-simulation.sh @@ -0,0 +1,267 @@ +#!/usr/bin/env bash +# ============LICENSE_START======================================================= +# dcaegen2-collectors-veshv +# ================================================================================ +# Copyright (C) 2018 NOKIA +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +set -euo pipefail + + +function usage() { + echo "" + echo "Send messages to hv-ves from multiple xNF simulators" + echo "Usage: $0 [-h|--help] [-v|--verbose] [--messages-in-batch] [--docker-network] [--xnf-logs-directory]" + echo " <hv ves hostname> <hv ves port> <simulators amount> <messages batches amount per simulator> <messages sending interval>" + echo "" + echo " - hv ves hostname : HighVolume VES Collector network hostname" + echo " - hv ves port : HighVolume VES Collector network port" + echo " - simulators amount : Amount of xNF simulators to be launched" + echo " - messages amount per simulator : Amount of messages to be sent from each xNF simulator to HV-VES" + echo " - messages sending interval : interval in seconds between sending messages from xNFs" + echo "Optional parameters:" + echo " - messages-in-batch : Amount of messages sent on each request" + echo " - docker-network : Docker network to which xNF simulators should be added" + echo " - xnf-logs-directory : Path to directory where logs from all xNF simulators should be stored" + echo "Example invocations:" + echo "./start-simulation.sh --messages-in-batch=5 --docker-network=development_default ves-hv-collector 6061 10 20 0.5" + echo "./start-simulation.sh --messages-in-batch=5 --xnf-logs-directory=/tmp/xnf-simulation localhost 6061 10 20 0.5" + exit 1 +} + +function verbose_log() { + if [ -n "${VERBOSE+x}" ]; then + echo $@ + fi +} + +function create_logs_dir() { + if [ -n "${XNF_LOGS_DIRECTORY+x}" ]; then + if [ ! -d "${XNF_LOGS_DIRECTORY}" ]; then + mkdir ${XNF_LOGS_DIRECTORY} + fi + fi +} + +function create_xNFs_simulators() { + for i in $(seq 1 ${XNFS_AMOUNT}); do + local XNF_PORT=$(get_unoccupied_port 32000 65000) + verbose_log "Starting xNF simulator container on port ${XNF_PORT} using run-xnf-simulator script" + XNF_CONTAINER_ID=$(${DEVELOPMENT_BIN_DIRECTORY}/run-xnf-simulator.sh $XNF_PORT ${DOCKER_NETWORK:-}) + CREATED_XNF_SIMULATORS_PORTS+=(${XNF_PORT}) + verbose_log "Container id: ${XNF_CONTAINER_ID}" + CREATED_XNF_SIMULATORS_IDS+=(${XNF_CONTAINER_ID}) + done +} + +function get_unoccupied_port() { + local LPORT=$1 + local UPORT=$2 + while true; do + local MPORT=$[$LPORT + ($RANDOM % $UPORT)]; + local LISTENING_PORTS=$(osqueryi --header=false --list "select port from listening_ports order by port"); + if (echo "${LISTENING_PORTS[@]}" | grep -xqv $MPORT); then + echo $MPORT; + break; + fi + done +} + +function wait_for_containers_startup_or_fail() { + local seconds_to_wait=10 + local all_containers_healthy=1 + + verbose_log "Waiting ${seconds_to_wait}s for containers startup" + set +e + for i in $(seq 1 ${seconds_to_wait}); do + verbose_log "Try no. ${i}" + all_containers_healthy=1 + for port in ${CREATED_XNF_SIMULATORS_PORTS[@]}; do + verbose_log "Checking container on port ${port}" + local status_code=$(curl -s -o /dev/null -I -w "%{http_code}" localhost:${port}/healthcheck) + if [ $status_code -ne 200 ]; then + verbose_log "Container on port ${port} is unhealthy " + all_containers_healthy=0 + break + fi + done + if [ $all_containers_healthy -eq 1 ]; then + break + fi + sleep 1 + done + set -e + + if [ $all_containers_healthy -ne 1 ]; then + echo "Some xNFs simulators failed at startup. Trying to cleanup..." + cleanup + echo "Exitting..." + exit 2 + fi +} + +function start_simulation() { + verbose_log "Simulation: every xNF will send ${MESSAGES_IN_BATCH} messages to hv-ves + ${MESSAGE_BATCHES_AMOUNT} times, once every ${MESSAGES_SENDING_INTERVAL}" + for port in ${CREATED_XNF_SIMULATORS_PORTS[@]}; do + start_single_simulation $port $MESSAGES_IN_BATCH & + done +} + +function start_single_simulation() { + local port=$1 + local messages_to_be_sent=$2 + local message_type="VALID" + for i in $(seq 1 ${MESSAGE_BATCHES_AMOUNT}); do + ${DEVELOPMENT_BIN_DIRECTORY}/xnf-simulation.sh $port $messages_to_be_sent $message_type > /dev/null & + sleep $MESSAGES_SENDING_INTERVAL + done +} + +function assure_all_xNFs_requests_were_sent { + WAIT_TIME_FOR_REQUESTS_TO_BE_SENT=$(echo ";1 + $MESSAGES_SENDING_INTERVAL * $MESSAGE_BATCHES_AMOUNT" | bc) + echo "Waiting ${WAIT_TIME_FOR_REQUESTS_TO_BE_SENT}s for all xNF requests to be sent" + sleep $WAIT_TIME_FOR_REQUESTS_TO_BE_SENT +} + +function wait_for_simulators_to_finish_sending_messages() { + local seconds_to_wait=$1 + local all_containers_finished=1 + + echo "Waiting up to ${seconds_to_wait}s for xNFs simulators to finish sending messages" + for i in $(seq 1 ${seconds_to_wait}); do + verbose_log "Wait no. ${i}" + all_containers_finished=1 + for port in ${CREATED_XNF_SIMULATORS_PORTS[@]}; do + local container_status=$(curl --request GET -s localhost:${port}/healthcheck | jq -r '.["Detailed status"]') + + verbose_log "Container on port ${port} status: ${container_status}" + if [ "${container_status}" = "Busy" ]; then + all_containers_finished=0 + break + fi + done + if [ $all_containers_finished -eq 1 ]; then + echo "All containers finished sending messages" + break + fi + sleep 1 + done +} + +function cleanup() { + echo "Cleaning up" + set +e + for container_id in ${CREATED_XNF_SIMULATORS_IDS[@]}; do + verbose_log "Stopping container: ${container_id}" + docker stop $container_id > /dev/null + if [ -n "${XNF_LOGS_DIRECTORY+x}" ]; then + local log_file=${XNF_LOGS_DIRECTORY}/${container_id}.log + verbose_log "Writing container logs to: ${log_file}" + docker logs ${container_id} > $log_file + fi + verbose_log "Removing container: ${container_id}" + docker rm $container_id > /dev/null + done + set -e +} + + +function parse_long_opts_with_arguments() { + if [[ ${OPTARG} =~ .*=.* ]] # is option in --key=value format + then + OPT=${OPTARG/=*/} + ((${#OPT} <= 1)) && { + echo "Invalid option '$OPT'" >&2 + exit 2 + } + OPTARG=${OPTARG#*=} + else + echo -e "No value provided for ${OPTARG}. Please use \"--${OPTARG}=VALUE\" format." >&2 + usage + fi +} + +# parse command line +optspec=":vh-:" # catch v, h and - +while getopts "$optspec" arg; do + case "${arg}" in + -) # handle longopts + case "${OPTARG}" in + verbose) + VERBOSE=True ;; + help) + usage ;; + *) + parse_long_opts_with_arguments + case "${OPT}" in + messages-in-batch) + MESSAGES_IN_BATCH=$OPTARG ;; + docker-network) + DOCKER_NETWORK=$OPTARG ;; + xnf-logs-directory) + XNF_LOGS_DIRECTORY=$OPTARG ;; + *) + usage ;; + esac ;; + esac ;; + v) + VERBOSE=True ;; + h) + usage ;; + *) + echo "Unknown option -${OPTARG}" >&2 + usage ;; + esac +done +shift $((OPTIND-1)) + +[ $# -le 4 ] && (echo -e "Unsufficient arguments"; usage) + + +DEVELOPMENT_BIN_DIRECTORY=$(realpath $(dirname "$0")) +HV_VES_HOSTNAME=${1} +HV_VES_PORT=${2} +XNFS_AMOUNT=${3} +MESSAGE_BATCHES_AMOUNT=${4} +MESSAGES_SENDING_INTERVAL=${5} + +# set defaults if absent +[ -z "${MESSAGES_IN_BATCH}" ] && MESSAGES_IN_BATCH=1 + +create_logs_dir + + +CREATED_XNF_SIMULATORS_PORTS=() +CREATED_XNF_SIMULATORS_IDS=() +echo "Creating ${XNFS_AMOUNT} xNFs simulators" +trap cleanup SIGINT SIGTERM +create_xNFs_simulators + +wait_for_containers_startup_or_fail + +echo "All xNFs containers are healthy, starting simulation" +start_simulation + +assure_all_xNFs_requests_were_sent + +assumed_message_sending_time=$(echo ";0.00025 * $XNFS_AMOUNT" | bc) +seconds_to_wait=$(echo ";$assumed_message_sending_time * $MESSAGE_BATCHES_AMOUNT * $MESSAGES_IN_BATCH" | bc) +wait_for_simulators_to_finish_sending_messages $seconds_to_wait +# there might be network lag between moment when xNF finished sending messages and they actually are received by hv-ves +# thus we cannot start removing xNFs immediately to prevent closing socket channels +sleep 5 + +cleanup
\ No newline at end of file diff --git a/development/bin/xnf-simulation.sh b/development/bin/xnf-simulation.sh index e1d65aa0..c3025b9b 100755 --- a/development/bin/xnf-simulation.sh +++ b/development/bin/xnf-simulation.sh @@ -65,7 +65,8 @@ if [ -n "${VERBOSE+x}" ]; then echo "Requesting xnf-simulator on port ${XNF_PORT} to send ${MESSAGES_AMOUNT} messages of type ${MESSAGES_TYPE}" fi -REQUEST_ID=$(curl --request POST -s localhost:${XNF_PORT}/${XNF_ENDPOINT} -d " +currentTimeMicros=$((`date +%s%N`/1000)) +REQUEST_ID=$(curl --request POST -s --header 'Content-Type: application/json' localhost:${XNF_PORT}/${XNF_ENDPOINT} -d " [ { \"commonEventHeader\": { @@ -77,7 +78,7 @@ REQUEST_ID=$(curl --request POST -s localhost:${XNF_PORT}/${XNF_ENDPOINT} -d " \"eventName\": \"sample-event-name\", \"eventType\": \"sample-event-type\", \"startEpochMicrosec\": 120034455, - \"lastEpochMicrosec\": 120034455, + \"lastEpochMicrosec\": $currentTimeMicros, \"nfNamingCode\": \"sample-nf-naming-code\", \"nfcNamingCode\": \"sample-nfc-naming-code\", \"reportingEntityId\": \"sample-reporting-entity-id\", diff --git a/development/start-simulation.sh b/development/start-simulation.sh deleted file mode 100755 index 6f38ea7b..00000000 --- a/development/start-simulation.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env bash -# TODO: Merge this file with bin/xnf-simulation.sh - -currentTimeMicros=$((`date +%s%N`/1000)) - -curl --header 'Content-Type: application/json' --request POST \ - --data '[ - { - "commonEventHeader": { - "version": "sample-version", - "domain": "perf3gpp", - "sequence": 1, - "priority": 1, - "eventId": "sample-event-id", - "eventName": "sample-event-name", - "eventType": "sample-event-type", - "startEpochMicrosec": 1545049703000000, - "lastEpochMicrosec": '$currentTimeMicros', - "nfNamingCode": "sample-nf-naming-code", - "nfcNamingCode": "sample-nfc-naming-code", - "reportingEntityId": "sample-reporting-entity-id", - "reportingEntityName": "sample-reporting-entity-name", - "sourceId": "sample-source-id", - "sourceName": "sample-source-name", - "vesEventListenerVersion": "7.2" - }, - "messageType": "VALID", - "messagesAmount": 1000000 - } - ]' \ - http://localhost:6062/simulator/async diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt index cfd3a6e9..a0785620 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -20,10 +20,12 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters import arrow.core.Either -import arrow.core.getOrElse import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.BUSY +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.DETAILED_STATUS_NODE +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.IDLE import org.onap.dcae.collectors.veshv.utils.http.HttpConstants import org.onap.dcae.collectors.veshv.utils.http.Response import org.onap.dcae.collectors.veshv.utils.http.Responses @@ -37,6 +39,7 @@ import ratpack.http.TypedData import ratpack.server.RatpackServer import ratpack.server.ServerConfig import java.util.* +import javax.json.Json /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -58,10 +61,7 @@ internal class XnfApiServer( .post("simulator", ::startSimulationHandler) .post("simulator/async", ::startSimulationHandler) .get("simulator/:id", ::simulatorStatusHandler) - .get("healthcheck") { ctx -> - logger.info { "Checking health" } - ctx.response.status(HttpConstants.STATUS_OK).send() - } + .get("healthcheck", ::healthcheckHandler) } private fun startSimulationHandler(ctx: Context) { @@ -82,6 +82,7 @@ internal class XnfApiServer( .map(Responses::acceptedResponse) } + private fun simulatorStatusHandler(ctx: Context) { logger.debug { "Checking task status" } val id = UUID.fromString(ctx.pathTokens["id"]) @@ -92,6 +93,21 @@ internal class XnfApiServer( ctx.response.sendAndHandleErrors(IO.just(response)) } + private fun healthcheckHandler(ctx: Context) { + val healthCheckDetailedMessage = createHealthCheckDetailedMessage() + val simulatorStatus = HttpConstants.STATUS_OK + logger.info { "Returning simulator status: ${simulatorStatus} ${healthCheckDetailedMessage}" } + ctx.response.status(simulatorStatus).send(healthCheckDetailedMessage) + } + + private fun createHealthCheckDetailedMessage() = + Json.createObjectBuilder() + .add(DETAILED_STATUS_NODE, when { + ongoingSimulations.isAnySimulationPending() -> BUSY + else -> IDLE + }) + .build().toString() + companion object { private val logger = Logger(XnfApiServer::class) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt index d7d42d88..bd58dd9c 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -57,6 +57,10 @@ class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound) + fun isAnySimulationPending() = simulations.any { + status(it.key) is StatusOngoing + } + internal fun clear() { simulations.clear() } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/status.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/status.kt new file mode 100644 index 00000000..a86e3d50 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/status.kt @@ -0,0 +1,28 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl + +// TODO: probably should be merged with HealthDescription or made similiar to it +internal object XnfStatus { + + const val BUSY = "Busy" + const val IDLE = "Idle" + const val DETAILED_STATUS_NODE = "Detailed status" +} diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt index a04da7bf..113c3c42 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt @@ -47,7 +47,7 @@ internal class OngoingSimulationsTest : Spek({ given("not existing task task id") { val id = UUID.randomUUID() - on("status") { + on("asking for status") { val result = cut.status(id) it("should have 'not found' status") { @@ -56,8 +56,16 @@ internal class OngoingSimulationsTest : Spek({ } } + given("no tasks") { + on("quering about any pending task") { + it("should return false") { + assertThat(cut.isAnySimulationPending()).isFalse() + } + } + } + given("never ending task") { - val task = IO.async<Unit> { } + val task = neverendingTask() on("startAsynchronousSimulation") { val result = cut.startAsynchronousSimulation(task) @@ -65,33 +73,48 @@ internal class OngoingSimulationsTest : Spek({ it("should have ongoing status") { assertThat(cut.status(result)).isEqualTo(StatusOngoing) } + + it("should return true when asked about any pending tasks") { + assertThat(cut.isAnySimulationPending()).isTrue() + } } } given("failing task") { - val cause = RuntimeException("facepalm") - val task = IO.raiseError<Unit>(cause) + val (cause, task) = failingTask() on("startAsynchronousSimulation") { - val result = cut.startAsynchronousSimulation(task) + val taskID = cut.startAsynchronousSimulation(task) it("should have failing status") { waitUntilSucceeds { - assertThat(cut.status(result)).isEqualTo(StatusFailure(cause)) + assertThat(cut.status(taskID)).isEqualTo(StatusFailure(cause)) + } + } + + it("should return false when asked about any pending tasks") { + waitUntilSucceeds { + assertThat(cut.isAnySimulationPending()).isFalse() } } } } given("successful task") { - val task = IO { println("great success!") } + val task = succesfulTask() on("startAsynchronousSimulation") { - val result = cut.startAsynchronousSimulation(task) + val taskID = cut.startAsynchronousSimulation(task) it("should have successful status") { waitUntilSucceeds { - assertThat(cut.status(result)).isEqualTo(StatusSuccess) + assertThat(cut.status(taskID)).isEqualTo(StatusSuccess) + } + } + + it("should return false when asked about any pending tasks") { + waitUntilSucceeds { + assertThat(cut.isAnySimulationPending()).isFalse() } } } @@ -104,3 +127,13 @@ internal class OngoingSimulationsTest : Spek({ afterEachTest { cut.clear() } }) + +private fun neverendingTask() = IO.async<Unit> { } + +private fun succesfulTask(): IO<Unit> = IO { println("great success!") } + +private fun failingTask(): Pair<RuntimeException, IO<Unit>> { + val cause = RuntimeException("facepalm") + val task = IO.raiseError<Unit>(cause) + return Pair(cause, task) +} |