diff options
62 files changed, 2232 insertions, 498 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index 33aedeca..f9f52b4e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -28,7 +28,7 @@ services: command: ["-server", "-bootstrap", "-ui-dir", "/ui"] ves-hv-collector: - image: nexus3.onap.org:10003/onap/ves-hv-collector:latest + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest # build: # context: hv-collector-main # dockerfile: Dockerfile @@ -51,7 +51,7 @@ services: - ./ssl/:/etc/ves-hv/ xnf-simulator: - image: nexus3.onap.org:10003/onap/ves-hv-collector-xnf-simulator + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator # build: # context: hv-collector-xnf-simulator # dockerfile: Dockerfile @@ -64,7 +64,7 @@ services: - ./ssl/:/etc/ves-hv/ dcae-app-simulator: - image: nexus3.onap.org:10003/onap/ves-hv-collector-dcae-simulator + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator # build: # context: hv-collector-dcae-app-simulator # dockerfile: Dockerfile diff --git a/hv-collector-analysis/pom.xml b/hv-collector-analysis/pom.xml index a4d0a738..e9ffcf36 100644 --- a/hv-collector-analysis/pom.xml +++ b/hv-collector-analysis/pom.xml @@ -19,32 +19,41 @@ ~ ============LICENSE_END========================================================= --> <project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> - <licenses> - <license> - <name>The Apache Software License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - </license> - </licenses> + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> - <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> - <artifactId>hv-collector-analysis</artifactId> - <version>1.0.0-SNAPSHOT</version> - <description>VES HighVolume Collector :: Code analysis configuration</description> + <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> + <artifactId>hv-collector-analysis</artifactId> + <version>1.0.0-SNAPSHOT</version> + <description>VES HighVolume Collector :: Code analysis configuration</description> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-deploy-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - </plugins> - </build> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <version>3.1.0</version> + <configuration> + <encoding>UTF-8</encoding> + </configuration> + </plugin> + </plugins> + </build> </project>
\ No newline at end of file diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 6c256b72..3c85a9b1 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator @@ -30,7 +31,7 @@ interface Collector { fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> } -typealias CollectorProvider = () -> Collector +typealias CollectorProvider = () -> Option<Collector> interface Server { fun start(): IO<ServerHandle> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index a400ff32..d807a9e7 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -32,6 +32,7 @@ import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.util.concurrent.atomic.AtomicReference @@ -57,7 +58,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(collector::set) - return collector::get + return collector::getOption } private fun createVesHvCollector(config: CollectorConfiguration): Collector { diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index f858d959..a34be7cd 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -70,23 +70,34 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, .receive() .retain() - return collectorProvider() - .handleConnection(nettyInbound.context().channel().alloc(), dataStream) + return collectorProvider().fold( + { + logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } + Mono.empty() + }, + { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) }) + } private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { onReadIdle(timeout.toMillis()) { - logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." } - context().channel().close().addListener { - if (it.isSuccess) - logger.debug { "Client disconnected because of idle timeout" } - else - logger.warn("Channel close failed", it.cause()) + logger.info { + "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..." } + disconnectClient() } return this } + private fun NettyInbound.disconnectClient() { + context().channel().close().addListener { + if (it.isSuccess) + logger.debug { "Channel (${remoteAddress()}) closed successfully." } + else + logger.warn("Channel close failed", it.cause()) + } + } + private fun NettyInbound.logConnectionClosed(): NettyInbound { context().onClose { logger.info("Connection from ${remoteAddress()} has been closed") diff --git a/hv-collector-coverage/check-coverage.sh b/hv-collector-coverage/check-coverage.sh new file mode 100755 index 00000000..956891ac --- /dev/null +++ b/hv-collector-coverage/check-coverage.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -euo pipefail + +JACOCO_REPORT="$1" +MIN_COVERAGE_PERCENT="$2" +LOG_FILE=target/check-coverage.log + +function coverage_from_report() { + local xpath_expr="string(/report/counter[@type='INSTRUCTION']/@$1)" + xpath -q -e "$xpath_expr" "$JACOCO_REPORT" 2>> ${LOG_FILE} +} + +missed=$(coverage_from_report missed) +covered=$(coverage_from_report covered) +total=$(($missed + $covered)) +coverage=$((100 * $covered / $total)) + +if [[ $(wc -c < ${LOG_FILE}) > 0 ]]; then + echo "Warnings from xpath evaluation:" + cat ${LOG_FILE} + echo +fi + +echo "Coverage: $coverage% (covered/total: $covered/$total)" + +if [[ ${coverage} -lt ${MIN_COVERAGE_PERCENT} ]]; then + echo "Coverage is too low. Minimum coverage: $MIN_COVERAGE_PERCENT%" + exit 1 +fi + diff --git a/hv-collector-coverage/pom.xml b/hv-collector-coverage/pom.xml index f988f8ec..31450918 100644 --- a/hv-collector-coverage/pom.xml +++ b/hv-collector-coverage/pom.xml @@ -60,7 +60,7 @@ </goals> <configuration> <excludes> - <!-- Exclute Protobuf-generated classes --> + <!-- Exclude Protobuf-generated classes --> <exclude>org/onap/ves/*</exclude> </excludes> <dataFileIncludes> @@ -71,6 +71,28 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <executions> + <execution> + <id>check-coverage</id> + <phase>verify</phase> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <skip>${skipTests}</skip> + <executable>${project.basedir}/check-coverage.sh</executable> + <workingDirectory>${project.basedir}</workingDirectory> + <arguments> + <argument>target/site/jacoco-aggregate/jacoco.xml</argument> + <argument>${jacoco.minimum.coverage}</argument> + </arguments> + </configuration> + </plugin> </plugins> </build> diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 1e22d4c0..ba29844a 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -149,7 +149,7 @@ object PerformanceSpecification : Spek({ val outputDigest = digest.digest() - assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize) + assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize) assertThat(outputDigest).isEqualTo(inputDigest) } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index e9b70578..942e6edf 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.tests.component +import arrow.core.getOrElse import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.UnpooledByteBufAllocator @@ -48,7 +49,7 @@ class Sut(sink: Sink = StoringSink()) { private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector - get() = collectorProvider() + get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") } } fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml index 47f71ba6..ce4a2715 100644 --- a/hv-collector-dcae-app-simulator/pom.xml +++ b/hv-collector-dcae-app-simulator/pom.xml @@ -19,8 +19,8 @@ ~ ============LICENSE_END========================================================= --> <project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <licenses> @@ -105,6 +105,14 @@ <artifactId>arrow-effects</artifactId> </dependency> <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects-reactor</artifactId> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-syntax</artifactId> + </dependency> + <dependency> <groupId>io.ratpack</groupId> <artifactId>ratpack-core</artifactId> </dependency> diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt new file mode 100644 index 00000000..262e05bf --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.core.getOrElse +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monadError +import arrow.typeclasses.bindingCatch +import org.onap.dcae.collectors.veshv.utils.arrow.getOption +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.io.InputStream +import java.util.concurrent.atomic.AtomicReference + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, + private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) { + private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference() + + fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) + + fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch { + if (topics.any { it.isBlank() }) + throw IllegalArgumentException("Topic list cannot contain empty elements") + if (topics.isEmpty()) + throw IllegalArgumentException("Topic list cannot be empty") + + logger.info("Received new configuration. Creating consumer for topics: $topics") + consumerState.set(consumerFactory.createConsumerForTopics(topics).bind()) + }.fix() + + fun state() = consumerState.getOption().map { it.currentState() } + + fun resetState(): IO<Unit> = consumerState.getOption().fold( + { IO.unit }, + { it.reset() } + ) + + fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages()) + + private fun currentMessages(): List<ByteArray> = + consumerState.getOption() + .map { it.currentState().consumedMessages } + .getOrElse(::emptyList) + + private fun extractTopics(topicsString: String): Set<String> = + topicsString.substringAfter("=") + .split(",") + .toSet() + + companion object { + private val logger = Logger(DcaeAppSimulator::class) + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt new file mode 100644 index 00000000..354edaeb --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -0,0 +1,89 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monadError +import arrow.typeclasses.bindingCatch +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventV5 +import java.io.InputStream +import javax.json.Json + +class MessageStreamValidation( + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE, + private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) { + + fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> = + IO.monadError().bindingCatch { + val messageParams = parseMessageParams(jsonDescription) + val expectedEvents = generateEvents(messageParams).bind() + val actualEvents = decodeConsumedEvents(consumedMessages) + if (shouldValidatePayloads(messageParams)) { + expectedEvents == actualEvents + } else { + validateHeaders(actualEvents, expectedEvents) + } + }.fix() + + private fun parseMessageParams(input: InputStream): List<MessageParameters> { + val expectations = Json.createReader(input).readArray() + val messageParams = messageParametersParser.parse(expectations) + + return messageParams.fold( + { throw IllegalArgumentException("Parsing error: " + it.message) }, + { + if (it.isEmpty()) + throw IllegalArgumentException("Message param list cannot be empty") + it + } + ) + } + + private fun shouldValidatePayloads(parameters: List<MessageParameters>) = + parameters.all { it.messageType == MessageType.FIXED_PAYLOAD } + + + private fun validateHeaders(actual: List<VesEventV5.VesEvent>, expected: List<VesEventV5.VesEvent>): Boolean { + val consumedHeaders = actual.map { it.commonEventHeader } + val generatedHeaders = expected.map { it.commonEventHeader } + return generatedHeaders == consumedHeaders + } + + + private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventV5.VesEvent>> = + messageGenerator.createMessageFlux(parameters) + .map(PayloadWireFrameMessage::payload) + .map(ByteData::unsafeAsArray) + .map(VesEventV5.VesEvent::parseFrom) + .collectList() + .asIo() + + private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) = + consumedMessages.map(VesEventV5.VesEvent::parseFrom) + +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt new file mode 100644 index 00000000..1eca9317 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -0,0 +1,101 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.utils.http.HttpConstants +import org.onap.dcae.collectors.veshv.utils.http.HttpStatus +import org.onap.dcae.collectors.veshv.utils.http.Responses +import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors +import org.onap.dcae.collectors.veshv.utils.http.sendOrError +import ratpack.handling.Chain +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { + private val responseValid by lazy { + Responses.statusResponse( + name = "valid", + message = "validation succeeded" + ) + } + + private val responseInvalid by lazy { + Responses.statusResponse( + name = "invalid", + message = "validation failed", + httpStatus = HttpStatus.BAD_REQUEST + ) + } + + + fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = + simulator.listenToTopics(kafkaTopics).map { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(::setupHandlers) + } + } + + private fun setupHandlers(chain: Chain) { + chain + .put("configuration/topics") { ctx -> + ctx.request.body.then { body -> + val operation = simulator.listenToTopics(body.text) + ctx.response.sendOrError(operation) + } + + } + .delete("messages") { ctx -> + ctx.response.contentType(CONTENT_TEXT) + ctx.response.sendOrError(simulator.resetState()) + } + .get("messages/all/count") { ctx -> + simulator.state().fold( + { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) }, + { + ctx.response + .contentType(CONTENT_TEXT) + .send(it.messagesCount.toString()) + }) + } + .post("messages/all/validate") { ctx -> + ctx.request.body.then { body -> + val response = simulator.validate(body.inputStream) + .map { isValid -> + if (isValid) responseValid else responseInvalid + } + ctx.response.sendAndHandleErrors(response) + } + } + .get("healthcheck") { ctx -> + ctx.response.status(HttpConstants.STATUS_OK).send() + } + } + + companion object { + private const val CONTENT_TEXT = "text/plain" + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index d53609ca..15965174 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -17,15 +17,16 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters import arrow.effects.IO import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer +import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions -import java.util.* /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -35,7 +36,7 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { fun start(): IO<Consumer> = IO { val consumer = Consumer() - receiver.receive().subscribe(consumer::update) + receiver.receive().map(consumer::update).evaluateIo().subscribe() consumer } @@ -43,18 +44,22 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { private val logger = Logger(KafkaSource::class) fun create(bootstrapServers: String, topics: Set<String>): KafkaSource { - val props = HashMap<String, Any>() - props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - props[ConsumerConfig.CLIENT_ID_CONFIG] = "hv-collector-dcae-app-simulator" - props[ConsumerConfig.GROUP_ID_CONFIG] = "hv-collector-simulators" - props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" - val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props) + return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) + } + + fun createReceiverOptions(bootstrapServers: String, topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? { + val props = mapOf<String, Any>( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator", + ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest" + ) + return ReceiverOptions.create<ByteArray, ByteArray>(props) .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } .subscription(topics) - return KafkaSource(KafkaReceiver.create(receiverOptions)) } } } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt index 065cdf92..d5f55605 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config import arrow.core.ForOption import arrow.core.Option diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt index 5bd2d155..c114313d 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config data class DcaeAppSimConfiguration( val apiPort: Int, diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 08bb149f..1eefdbdb 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -17,9 +17,10 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.ReceiverRecord import java.util.concurrent.ConcurrentLinkedQueue @@ -28,7 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){ +class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) { val messagesCount: Int by lazy { messages.size } @@ -53,19 +54,17 @@ class Consumer : ConsumerStateProvider { consumedMessages.clear() } - fun update(record: ReceiverRecord<ByteArray, ByteArray>) { + fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } consumedMessages.add(record.value()) } - companion object { private val logger = Logger(Consumer::class) } } class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumerForTopics(kafkaTopics: Set<String>): ConsumerStateProvider { - return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync() - } + fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> = + KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start() } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 9f84fc4d..c0f8b340 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -20,13 +20,14 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp import arrow.effects.IO -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgDcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync -import org.onap.dcae.collectors.veshv.utils.arrow.void +import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -50,7 +51,7 @@ fun main(args: Array<String>) = private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { - return ApiServer(ConsumerFactory(config.kafkaBootstrapServers)) + return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers))) .start(config.apiPort, config.kafkaTopics) - .void() -}
\ No newline at end of file + .unit() +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt deleted file mode 100644 index cd258134..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt +++ /dev/null @@ -1,169 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD -import org.onap.ves.VesEventV5.VesEvent -import ratpack.handling.Chain -import ratpack.handling.Context -import ratpack.server.RatpackServer -import ratpack.server.ServerConfig -import reactor.core.publisher.Mono -import javax.json.Json - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -class ApiServer(private val consumerFactory: ConsumerFactory, - private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { - - private lateinit var consumerState: ConsumerStateProvider - - fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO { - consumerState = consumerFactory.createConsumerForTopics(kafkaTopics) - RatpackServer.start { server -> - server.serverConfig(ServerConfig.embedded().port(port)) - .handlers(this::setupHandlers) - } - } - - private fun setupHandlers(chain: Chain) { - chain - .put("configuration/topics") { ctx -> - ctx.request.body.then { it -> - val topics = extractTopics(it.text) - logger.info("Received new configuration. Creating consumer for topics: $topics") - consumerState = consumerFactory.createConsumerForTopics(topics) - ctx.response - .status(STATUS_OK) - .send() - } - - } - .delete("messages") { ctx -> - ctx.response.contentType(CONTENT_TEXT) - consumerState.reset() - .unsafeRunAsync { - it.fold( - { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) }, - { ctx.response.status(STATUS_OK) } - ).send() - } - } - .get("messages/all/count") { ctx -> - val state = consumerState.currentState() - ctx.response - .contentType(CONTENT_TEXT) - .send(state.messagesCount.toString()) - } - .post("messages/all/validate") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readArray() } - .map { messageParametersParser.parse(it) } - .map { generateEvents(ctx, it) } - .then { (generatedEvents, shouldValidatePayloads) -> - generatedEvents - .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) } - .block() - } - } - .get("healthcheck") { ctx -> - ctx.response.status(STATUS_OK).send() - } - } - - private fun generateEvents(ctx: Context, parameters: List<MessageParameters>): - Pair<Mono<List<VesEvent>>, Boolean> = Pair( - - doGenerateEvents(parameters).doOnError { - logger.error("Error occurred when generating messages: $it") - ctx.response - .status(STATUS_INTERNAL_SERVER_ERROR) - .send() - }, - parameters.all { it.messageType == FIXED_PAYLOAD } - ) - - private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE - .createMessageFlux(parameters) - .map(PayloadWireFrameMessage::payload) - .map { decode(it.unsafeAsArray()) } - .collectList() - - - private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes) - - - private fun sendResponse(ctx: Context, - generatedEvents: List<VesEvent>, - shouldValidatePayloads: Boolean) = - resolveResponseStatusCode( - generated = generatedEvents, - consumed = decodeConsumedEvents(), - validatePayloads = shouldValidatePayloads - ).let { ctx.response.status(it).send() } - - - private fun decodeConsumedEvents(): List<VesEvent> = consumerState - .currentState() - .consumedMessages - .map(::decode) - - - private fun resolveResponseStatusCode(generated: List<VesEvent>, - consumed: List<VesEvent>, - validatePayloads: Boolean): Int = - if (validatePayloads) { - if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST - } else { - validateHeaders(consumed, generated) - } - - private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int { - val consumedHeaders = consumed.map { it.commonEventHeader } - val generatedHeaders = generated.map { it.commonEventHeader } - return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST - } - - private fun extractTopics(it: String): Set<String> = - it.substringAfter("=") - .split(",") - .toSet() - - companion object { - private val logger = Logger(ApiServer::class) - private const val CONTENT_TEXT = "text/plain" - - private const val STATUS_OK = 200 - private const val STATUS_BAD_REQUEST = 400 - private const val STATUS_INTERNAL_SERVER_ERROR = 500 - } -} - - diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt new file mode 100644 index 00000000..debe9554 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import reactor.kafka.receiver.ReceiverRecord + + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +internal class ConsumerTest : Spek({ + + lateinit var cut: Consumer + + beforeEachTest { + cut = Consumer() + } + + describe("Consumer which holds the state of received Kafka records") { + it("should contain empty state in the beginning") { + assertEmptyState(cut) + } + + describe("update") { + val value = byteArrayOf(2) + + beforeEachTest { + cut.update(receiverRecord( + topic = "topic", + key = byteArrayOf(1), + value = value + )).unsafeRunSync() + } + + it("should contain one message if it was updated once") { + assertState(cut, value) + } + + it("should contain empty state message if it was reset after update") { + cut.reset().unsafeRunSync() + assertEmptyState(cut) + } + } + } +}) + +fun assertEmptyState(cut: Consumer) { + assertState(cut) +} + +fun assertState(cut: Consumer, vararg values: ByteArray) { + assertThat(cut.currentState().consumedMessages) + .containsOnly(*values) + assertThat(cut.currentState().messagesCount) + .isEqualTo(values.size) +} + +fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) = + ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null) diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt new file mode 100644 index 00000000..c0ba5812 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -0,0 +1,184 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.core.Left +import arrow.core.None +import arrow.core.Some +import arrow.effects.IO +import com.google.protobuf.ByteString +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.eq +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.never +import com.nhaarman.mockito_kotlin.verify +import com.nhaarman.mockito_kotlin.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.mockito.ArgumentMatchers.anySet +import org.mockito.Mockito +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +internal class DcaeAppSimulatorTest : Spek({ + lateinit var consumerFactory: ConsumerFactory + lateinit var messageStreamValidation: MessageStreamValidation + lateinit var consumer: Consumer + lateinit var cut: DcaeAppSimulator + + beforeEachTest { + consumerFactory = mock() + messageStreamValidation = mock() + consumer = mock() + cut = DcaeAppSimulator(consumerFactory, messageStreamValidation) + + whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer)) + } + + fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList())) + + describe("listenToTopics") { + val topics = setOf("hvMeas", "faults") + + it("should fail when topic list is empty") { + val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync() + assertThat(result.isLeft()).isTrue() + } + + it("should fail when topic list contains empty strings") { + val result = cut.listenToTopics(setOf("hvMeas", " ", "faults")).attempt().unsafeRunSync() + assertThat(result.isLeft()).isTrue() + } + + it("should subscribe to given topics") { + cut.listenToTopics(topics).unsafeRunSync() + verify(consumerFactory).createConsumerForTopics(topics) + } + + it("should subscribe to given topics when called with comma separated list") { + cut.listenToTopics("hvMeas,faults").unsafeRunSync() + verify(consumerFactory).createConsumerForTopics(topics) + } + + it("should handle errors") { + // given + val error = RuntimeException("WTF") + whenever(consumerFactory.createConsumerForTopics(anySet())) + .thenReturn(IO.raiseError(error)) + + // when + val result = cut.listenToTopics("hvMeas").attempt().unsafeRunSync() + + // then + assertThat(result).isEqualTo(Left(error)) + } + } + + describe("state") { + + it("should return None when topics hasn't been initialized") { + assertThat(cut.state()).isEqualTo(None) + } + + describe("when topics are initialized") { + beforeEachTest { + cut.listenToTopics("hvMeas").unsafeRunSync() + } + + it("should return some state when it has been set") { + val state = consumerState() + whenever(consumer.currentState()).thenReturn(state) + + assertThat(cut.state()).isEqualTo(Some(state)) + } + } + } + + describe("resetState") { + it("should do nothing when topics hasn't been initialized") { + cut.resetState().unsafeRunSync() + verify(consumer, never()).reset() + } + + describe("when topics are initialized") { + beforeEachTest { + cut.listenToTopics("hvMeas").unsafeRunSync() + } + + it("should reset the state") { + // given + whenever(consumer.reset()).thenReturn(IO.unit) + + // when + cut.resetState().unsafeRunSync() + + // then + verify(consumer).reset() + } + } + } + + describe("validate") { + beforeEachTest { + whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true)) + } + + it("should use empty list when consumer is unavailable") { + // when + val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync() + + // then + verify(messageStreamValidation).validate(any(), eq(emptyList())) + assertThat(result).isTrue() + } + + it("should delegate to MessageStreamValidation") { + // given + cut.listenToTopics("hvMeas").unsafeRunSync() + whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray())) + + // when + val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync() + + // then + verify(messageStreamValidation).validate(any(), any()) + assertThat(result).isTrue() + } + } +}) + + +private const val DUMMY_EVENT_ID = "aaa" +private const val DUMMY_PAYLOAD = "payload" + +private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent { + return VesEvent.newBuilder() + .setCommonEventHeader(CommonEventHeader.newBuilder() + .setEventId(eventId)) + .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray())) + .build() +} diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt new file mode 100644 index 00000000..2932367b --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -0,0 +1,225 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.core.Either +import arrow.core.Left +import arrow.core.None +import arrow.core.Right +import arrow.core.Some +import arrow.effects.IO +import javax.json.stream.JsonParsingException +import com.google.protobuf.ByteString +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.never +import com.nhaarman.mockito_kotlin.verify +import com.nhaarman.mockito_kotlin.whenever +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.fail +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.mockito.ArgumentMatchers.anyList +import org.mockito.ArgumentMatchers.anySet +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux +import java.util.concurrent.ConcurrentLinkedQueue +import javax.json.Json +import javax.json.JsonArray +import javax.json.JsonValue + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +internal class MessageStreamValidationTest : Spek({ + lateinit var messageParametersParser: MessageParametersParser + lateinit var messageGenerator: MessageGenerator + lateinit var cut: MessageStreamValidation + + beforeEachTest { + messageParametersParser = mock() + messageGenerator = mock() + cut = MessageStreamValidation(messageParametersParser, messageGenerator) + } + + fun givenParsedMessageParameters(vararg params: MessageParameters) { + whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList())) + } + + describe("validate") { + + it("should return error when JSON is invalid") { + // when + val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync() + + // then + when(result) { + is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java) + else -> fail("validation should fail") + } + } + + it("should return error when message param list is empty") { + // given + givenParsedMessageParameters() + + // when + val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync() + + // then + assertThat(result.isLeft()).isTrue() + } + + describe("when validating headers only") { + it("should return true when messages are the same") { + // given + val jsonAsStream = sampleJsonAsStream() + val event = vesEvent() + val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray()) + val receivedMessageBytes = event.toByteArray() + + givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isTrue() + } + + it("should return true when messages differ with payload only") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent(payload = "payload A") + val receivedEvent = vesEvent(payload = "payload B") + val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isTrue() + } + + it("should return false when messages are different") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent() + val receivedEvent = vesEvent(eventId = "bbb") + val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isFalse() + } + } + + describe("when validating whole messages") { + it("should return true when messages are the same") { + // given + val jsonAsStream = sampleJsonAsStream() + val event = vesEvent() + val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray()) + val receivedMessageBytes = event.toByteArray() + + givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isTrue() + } + + it("should return false when messages differ with payload only") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent(payload = "payload A") + val receivedEvent = vesEvent(payload = "payload B") + val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isFalse() + } + + it("should return false when messages are different") { + // given + val jsonAsStream = sampleJsonAsStream() + val generatedEvent = vesEvent() + val receivedEvent = vesEvent("bbb") + val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() + + givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + + // when + val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() + + // then + assertThat(result).isFalse() + } + } + } +}) + + + +private const val DUMMY_EVENT_ID = "aaa" +private const val DUMMY_PAYLOAD = "payload" + +private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent { + return VesEvent.newBuilder() + .setCommonEventHeader(CommonEventHeader.newBuilder() + .setEventId(eventId)) + .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray())) + .build() +} + +private const val sampleJsonArray = """["headersOnly"]""" + +private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream() diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt new file mode 100644 index 00000000..de74f628 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> + * @since August 2018 + */ +internal class KafkaSourceTest : Spek({ + val servers = "kafka1:9080,kafka2:9080" + val topics = setOf("topic1", "topic2") + + describe("receiver options") { + val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable() + + fun verifyProperty(key: String, expectedValue: Any) { + it("should have $key option set") { + assertThat(options.consumerProperty(key)) + .isEqualTo(expectedValue) + } + } + + verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers) + verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator") + verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators") + verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) + verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) + verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + } +})
\ No newline at end of file diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt index 7d887939..e7a22fcf 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index b2e42509..c61ab266 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -31,7 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.R * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -class WireFrameEncoder(private val allocator: ByteBufAllocator) { +class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) { fun encode(frame: PayloadWireFrameMessage): ByteBuf { val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size()) diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index 89d1f32e..fa63c36e 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -40,7 +40,7 @@ import kotlin.test.fail */ object WireFrameCodecsTest : Spek({ val payloadAsString = "coffeebabe" - val encoder = WireFrameEncoder(UnpooledByteBufAllocator.DEFAULT) + val encoder = WireFrameEncoder() val decoder = WireFrameDecoder() fun createSampleFrame() = diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt index 79fc9321..1adf0cad 100644 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt @@ -19,16 +19,15 @@ */ package org.onap.dcae.collectors.veshv.healthcheck.api -import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK -import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE +import org.onap.dcae.collectors.veshv.utils.http.HttpStatus /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -enum class HealthStatus(val httpResponseStatus: Int) { - UP(OK), - DOWN(SERVICE_UNAVAILABLE), - OUT_OF_SERVICE(SERVICE_UNAVAILABLE), - UNKNOWN(SERVICE_UNAVAILABLE) +enum class HealthStatus(val httpResponseStatus: HttpStatus) { + UP(HttpStatus.OK), + DOWN(HttpStatus.SERVICE_UNAVAILABLE), + OUT_OF_SERVICE(HttpStatus.SERVICE_UNAVAILABLE), + UNKNOWN(HttpStatus.SERVICE_UNAVAILABLE) } diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt index 7e9efac7..753f73ef 100644 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt @@ -51,7 +51,7 @@ class HealthCheckApiServer(private val healthState: HealthState, private val por private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) = healthDescription.get().run { - resp.status(status.httpResponseStatus).sendString(Flux.just(status.toString(), "\n", message)) + resp.status(status.httpResponseStatus.number).sendString(Flux.just(status.toString(), "\n", message)) } private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) = diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml index 5127e7ef..a0235e17 100644 --- a/hv-collector-main/src/main/resources/logback.xml +++ b/hv-collector-main/src/main/resources/logback.xml @@ -27,9 +27,9 @@ </appender> <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> - <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/> - <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/> - <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/> + <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/> <!--<logger name="reactor.ipc.netty" level="DEBUG"/>--> <root level="INFO"> diff --git a/hv-collector-test-utils/pom.xml b/hv-collector-test-utils/pom.xml index 3960e399..3b6c0e89 100644 --- a/hv-collector-test-utils/pom.xml +++ b/hv-collector-test-utils/pom.xml @@ -51,5 +51,10 @@ <version>${project.parent.version}</version> <scope>compile</scope> </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>compile</scope> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt new file mode 100644 index 00000000..54913744 --- /dev/null +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.utils + +import arrow.core.Either +import arrow.core.identity +import org.assertj.core.api.AbstractAssert +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.ObjectAssert + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +class EitherAssert<A, B>(actual: Either<A, B>) + : AbstractAssert<EitherAssert<A, B>, Either<A, B>>(actual, EitherAssert::class.java) { + + fun isLeft(): EitherAssert<A, B> { + isNotNull() + isInstanceOf(Either.Left::class.java) + return myself + } + + fun left(): ObjectAssert<A> { + isLeft() + val left = actual.fold( + ::identity, + { throw AssertionError("should be left") }) + return assertThat(left) + } + + fun isRight(): EitherAssert<A, B> { + isNotNull() + isInstanceOf(Either.Right::class.java) + return myself + } + + fun right(): ObjectAssert<B> { + isRight() + val right = actual.fold( + { throw AssertionError("should be right") }, + ::identity) + return assertThat(right) + } +} diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt index 081dd0da..d017b31b 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt @@ -17,15 +17,39 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.utils.http +package org.onap.dcae.collectors.veshv.tests.utils + +import arrow.core.Either +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.time.Duration /** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since August 2018 + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 */ -class Status { - companion object { - const val OK = 200 - const val SERVICE_UNAVAILABLE = 503 + +private val logger = Logger("org.onap.dcae.collectors.veshv.tests.utils") + +object Assertions : org.assertj.core.api.Assertions() { + fun <A,B> assertThat(actual: Either<A, B>) = EitherAssert(actual) +} + + +fun waitUntilSucceeds(action: () -> Unit) = waitUntilSucceeds(50, Duration.ofMillis(10), action) + +fun waitUntilSucceeds(retries: Int, sleepTime: Duration, action: () -> Unit) { + var tryNum = 0 + while (tryNum <= retries) { + tryNum++ + try { + logger.debug("Try number $tryNum") + action() + break + } catch (ex: Throwable) { + if (tryNum >= retries) + throw ex + else + Thread.sleep(sleepTime.toMillis()) + } } } diff --git a/hv-collector-test-utils/src/main/kotlin/configurations.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/configurations.kt index 57843b45..57843b45 100644 --- a/hv-collector-test-utils/src/main/kotlin/configurations.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/configurations.kt diff --git a/hv-collector-test-utils/src/main/kotlin/messages.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt index c6aa89b2..c6aa89b2 100644 --- a/hv-collector-test-utils/src/main/kotlin/messages.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt diff --git a/hv-collector-test-utils/src/main/kotlin/vesEvents.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt index 6aeb6206..6aeb6206 100644 --- a/hv-collector-test-utils/src/main/kotlin/vesEvents.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt diff --git a/hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker b/hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000..ca6ee9ce --- /dev/null +++ b/hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline
\ No newline at end of file diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml index f1b7f061..81daf9b2 100644 --- a/hv-collector-utils/pom.xml +++ b/hv-collector-utils/pom.xml @@ -85,6 +85,20 @@ <artifactId>arrow-syntax</artifactId> </dependency> <dependency> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-core</artifactId> + </dependency> + <dependency> + <groupId>io.ratpack</groupId> + <artifactId>ratpack-core</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>javax.json</groupId> + <artifactId>javax.json-api</artifactId> + <optional>true</optional> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> @@ -121,7 +135,10 @@ <artifactId>logback-classic</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + <scope>provided</scope> + </dependency> </dependencies> - - </project>
\ No newline at end of file diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt index 39964c1e..a99fef5e 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt @@ -20,12 +20,31 @@ package org.onap.dcae.collectors.veshv.utils.arrow import arrow.core.Either +import arrow.core.None +import arrow.core.Option +import arrow.core.Some import arrow.core.identity +import arrow.syntax.collections.firstOption +import java.util.* +import java.util.concurrent.atomic.AtomicReference /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since July 2018 */ - fun <A> Either<A, A>.flatten() = fold(::identity, ::identity) + +fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity) + +fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity) + +fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get()) + +fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> = + if (firstValue != null) + Option.just(firstValue) + else nextValues.asSequence() + .map { it() } + .filter { it != null } + .firstOption() diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt index e37b0d7d..05d13094 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt @@ -20,7 +20,11 @@ package org.onap.dcae.collectors.veshv.utils.arrow import arrow.core.Either +import arrow.core.Left +import arrow.core.Right import arrow.effects.IO +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import kotlin.system.exitProcess /** @@ -46,4 +50,20 @@ fun Either<IO<Unit>, IO<Unit>>.unsafeRunEitherSync(onError: (Throwable) -> ExitC flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() }) -fun IO<Any>.void() = map { Unit } +fun IO<Any>.unit() = map { Unit } + +fun <T> Mono<T>.asIo() = IO.async<T> { callback -> + subscribe({ + callback(Right(it)) + }, { + callback(Left(it)) + }) +} + +fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> = + flatMap { io -> + io.attempt().unsafeRunSync().fold( + { Flux.error<T>(it) }, + { Flux.just<T>(it) } + ) + } diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt index 16634889..1ebe4e48 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt @@ -27,6 +27,7 @@ import arrow.core.getOrElse import org.apache.commons.cli.CommandLine import org.apache.commons.cli.CommandLineParser import org.apache.commons.cli.Options +import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain import java.io.File import java.nio.file.Path import java.nio.file.Paths @@ -77,6 +78,7 @@ abstract class ArgBasedConfiguration<T>(private val parser: CommandLineParser) { protected fun stringPathToPath(path: String): Path = Paths.get(File(path).toURI()) - private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption): Option<String> = - Option.fromNullable(getOptionValue(cmdLineOpt.option.opt)) + private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain( + getOptionValue(cmdLineOpt.option.opt), + { System.getenv(cmdLineOpt.environmentVariableName()) }) } diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt index 836a05df..3a154db2 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt @@ -117,5 +117,14 @@ enum class CommandLineOption(val option: Option) { .longOpt("dummy") .desc("If present will start in dummy mode (dummy external services)") .build() - ), + ); + + fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String = + option.longOpt.toUpperCase().replace('-', '_').let { mainPart -> + "${prefix}_${mainPart}" + } + + companion object { + private const val DEFAULT_ENV_PREFIX = "VESHV" + } } diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt new file mode 100644 index 00000000..c5c46397 --- /dev/null +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.http + +import arrow.typeclasses.Show +import java.util.* +import javax.json.Json + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since August 2018 + */ +object HttpConstants { + const val STATUS_OK = 200 + const val STATUS_ACCEPTED = 202 + const val STATUS_BAD_REQUEST = 400 + const val STATUS_NOT_FOUND = 404 + const val STATUS_INTERNAL_SERVER_ERROR = 500 + const val STATUS_SERVICE_UNAVAILABLE = 503 + + const val CONTENT_TYPE_JSON = "application/json" + const val CONTENT_TYPE_TEXT = "text/plain" +} + +enum class HttpStatus(val number: Int) { + OK(HttpConstants.STATUS_OK), + ACCEPTED(HttpConstants.STATUS_ACCEPTED), + BAD_REQUEST(HttpConstants.STATUS_BAD_REQUEST), + NOT_FOUND(HttpConstants.STATUS_NOT_FOUND), + INTERNAL_SERVER_ERROR(HttpConstants.STATUS_INTERNAL_SERVER_ERROR), + SERVICE_UNAVAILABLE(HttpConstants.STATUS_SERVICE_UNAVAILABLE) +} + + +enum class ContentType(val value: String) { + JSON(HttpConstants.CONTENT_TYPE_JSON), + TEXT(HttpConstants.CONTENT_TYPE_TEXT) +} + +data class Response(val status: HttpStatus, val content: Content<Any>) +data class Content<T>(val type: ContentType, val value: T, val serializer: Show<T> = Show.any()) + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +object Responses { + + fun acceptedResponse(id: UUID): Response { + return Response( + HttpStatus.ACCEPTED, + Content(ContentType.TEXT, id) + ) + } + + fun statusResponse(name: String, message: String, httpStatus: HttpStatus = HttpStatus.OK): Response { + return Response(httpStatus, + Content(ContentType.JSON, + Json.createObjectBuilder() + .add("status", name) + .add("message", message) + .build())) + } +} diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt new file mode 100644 index 00000000..0282d0c7 --- /dev/null +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.http + +import arrow.core.Either +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import javax.json.Json + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ + +private val logger = Logger("org.onap.dcae.collectors.veshv.utils.arrow.ratpack") + +fun ratpack.http.Response.sendOrError(action: IO<Unit>) { + sendAndHandleErrors(action.map { + Response( + HttpStatus.OK, + Content( + ContentType.JSON, + Json.createObjectBuilder().add("response", "Request accepted").build())) + }) +} + +fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Response>) { + when(response) { + is Either.Left -> send(errorResponse(response.a.toString())) + is Either.Right -> sendAndHandleErrors(IO.just(response.b)) + } +} + +fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) { + response.attempt().unsafeRunSync().fold( + { err -> + logger.warn("Error occurred. Sending .", err) + val message = err.message + send(errorResponse(message)) + }, + ::send + ) +} + +private fun errorResponse(message: String?): Response { + return Response( + HttpStatus.INTERNAL_SERVER_ERROR, + Content( + ContentType.JSON, + Json.createObjectBuilder().add("error", message).build())) +} + +fun ratpack.http.Response.send(response: Response) { + val respWithStatus = status(response.status.number) + response.content.apply { + respWithStatus.send( + type.value, + serializer.run { value.show() }) + } +} diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt new file mode 100644 index 00000000..29359439 --- /dev/null +++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt @@ -0,0 +1,142 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.arrow + +import arrow.core.None +import arrow.core.Option +import arrow.core.Some +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.jetbrains.spek.api.dsl.xdescribe +import java.util.concurrent.atomic.AtomicReference + + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> + * @since August 2018 + */ +internal class CoreKtTest : Spek({ + describe("AtomicReference.getOption") { + given("empty atomic reference") { + val atomicReference = AtomicReference<String>() + + on("getOption") { + val result = atomicReference.getOption() + + it("should be None") { + assertThat(result).isEqualTo(None) + } + } + } + given("non-empty atomic reference") { + val initialValue = "reksio" + val atomicReference = AtomicReference(initialValue) + + on("getOption") { + val result = atomicReference.getOption() + + it("should be Some($initialValue)") { + assertThat(result).isEqualTo(Some(initialValue)) + } + } + } + } + + describe("Option.fromNullablesChain") { + given("one non-null element") { + val just = "some text" + on("calling factory") { + val result = Option.fromNullablesChain(just) + + it("should return Some($just)") { + assertThat(result).isEqualTo(Some(just)) + } + } + } + + given("one null element") { + val just: String? = null + on("calling factory") { + val result = Option.fromNullablesChain(just) + + it("should return None") { + assertThat(result).isEqualTo(None) + } + } + } + + given("first non-null element") { + val first = "some text" + val second: String? = null + var secondAskedForValue = false + on("calling factory") { + val result = Option.fromNullablesChain(first, { secondAskedForValue = true; second }) + + it("should return Some($first)") { + assertThat(result).isEqualTo(Some(first)) + } + + it("should have not called second provider (should be lazy)") { + assertThat(secondAskedForValue).isFalse() + } + } + } + + given("two non-null elements") { + val first = "some text" + val second = "another text" + on("calling factory") { + val result = Option.fromNullablesChain(first, { second }) + + it("should return Some($first)") { + assertThat(result).isEqualTo(Some(first)) + } + } + } + + given("two null elements") { + val first: String? = null + val second: String? = null + on("calling factory") { + val result = Option.fromNullablesChain(first, { second }) + + it("should return None") { + assertThat(result).isEqualTo(None) + } + } + } + + given("second non-null element") { + val first: String? = null + val second = "another text" + on("calling factory") { + val result = Option.fromNullablesChain(first, { second }) + + it("should return Some($second)") { + assertThat(result).isEqualTo(Some(second)) + } + } + } + } +}) diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOptionTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOptionTest.kt new file mode 100644 index 00000000..f36df043 --- /dev/null +++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOptionTest.kt @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.commandline + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +class CommandLineOptionTest : Spek({ + describe("command line options enum") { + describe("environment variables") { + given("sample option and prefix") { + val opt = CommandLineOption.KAFKA_SERVERS + val prefix = "CONFIG" + + on("calling environmentVariableName") { + val result = opt.environmentVariableName(prefix) + + it("should return prefixed upper snake cased long option name") { + assertThat(result).isEqualTo("CONFIG_KAFKA_BOOTSTRAP_SERVERS") + } + } + } + + given("sample option without prefix") { + val opt = CommandLineOption.DUMMY_MODE + + on("calling environmentVariableName") { + val result = opt.environmentVariableName() + + it("should return prefixed upper snake cased long option name") { + assertThat(result).isEqualTo("VESHV_DUMMY") + } + } + } + } + } +}) diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt new file mode 100644 index 00000000..f9f716a1 --- /dev/null +++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt @@ -0,0 +1,101 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.http + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import java.util.* +import javax.json.JsonObject + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +internal class ResponsesTest : Spek({ + describe("response factory") { + describe("accepted response") { + given("uuid") { + val uuid = UUID.randomUUID() + + on("calling acceptedResponse") { + val result = Responses.acceptedResponse(uuid) + + it ("should have ACCEPTED status") { + assertThat(result.status).isEqualTo(HttpStatus.ACCEPTED) + } + + it ("should have text body") { + assertThat(result.content.type).isEqualTo(ContentType.TEXT) + } + + it ("should contain UUID text in the body") { + val serialized = result.content.serializer.run { result.content.value.show() } + assertThat(serialized).isEqualTo(uuid.toString()) + } + } + } + } + describe("status response") { + given("all params are specified") { + val status = "ok" + val message = "good job" + val httpStatus = HttpStatus.OK + + on("calling statusResponse") { + val result = Responses.statusResponse(status, message, httpStatus) + val json = result.content.value as JsonObject + + it ("should have OK status") { + assertThat(result.status).isEqualTo(HttpStatus.OK) + } + + it ("should have json body") { + assertThat(result.content.type).isEqualTo(ContentType.JSON) + } + + it ("should contain status as string") { + assertThat(json.getString("status")).isEqualTo(status) + } + + it ("should contain message") { + assertThat(json.getString("message")).isEqualTo(message) + } + } + } + + given("default params are omitted") { + val status = "ok" + val message = "good job" + + on("calling statusResponse") { + val result = Responses.statusResponse(status, message) + + it ("should have OK status") { + assertThat(result.status).isEqualTo(HttpStatus.OK) + } + } + } + } + } +}) diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt index 060f28a2..754fa31f 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt @@ -19,11 +19,13 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.api +import arrow.core.Either +import arrow.core.Option import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl import javax.json.JsonArray interface MessageParametersParser { - fun parse(request: JsonArray): List<MessageParameters> + fun parse(request: JsonArray): Either<ParsingError, List<MessageParameters>> companion object { val INSTANCE: MessageParametersParser by lazy { @@ -31,3 +33,5 @@ interface MessageParametersParser { } } } + +data class ParsingError(val message: String, val cause: Option<Throwable>) diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt index 5b328f1c..f3095618 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt @@ -19,9 +19,12 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.impl +import arrow.core.Option +import arrow.core.Try import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError import javax.json.JsonArray /** @@ -32,8 +35,8 @@ internal class MessageParametersParserImpl( private val commonEventHeaderParser: CommonEventHeaderParser = CommonEventHeaderParser() ) : MessageParametersParser { - override fun parse(request: JsonArray): List<MessageParameters> = - try { + override fun parse(request: JsonArray) = + Try { request .map { it.asJsonObject() } .map { @@ -41,13 +44,13 @@ internal class MessageParametersParserImpl( .parse(it.getJsonObject("commonEventHeader")) val messageType = MessageType.valueOf(it.getString("messageType")) val messagesAmount = it.getJsonNumber("messagesAmount")?.longValue() - ?: throw ParsingException("\"messagesAmount\" could not be parsed from message.", - NullPointerException()) + ?: throw NullPointerException("\"messagesAmount\" could not be parsed from message.") MessageParameters(commonEventHeader, messageType, messagesAmount) } - } catch (e: Exception) { - throw ParsingException("Parsing request body failed", e) + }.toEither().mapLeft { ex -> + ParsingError( + ex.message ?: "Unable to parse message parameters", + Option.fromNullable(ex)) } - internal class ParsingException(message: String, cause: Exception) : Exception(message, cause) } diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt index 92561995..3b1a48b3 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt @@ -20,13 +20,12 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.assertj.core.api.Assertions.fail import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl.ParsingException import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl @@ -45,18 +44,20 @@ object MessageParametersParserTest : Spek({ it("should parse MessagesParameters object successfully") { val result = messageParametersParser.parse(validMessagesParametesJson()) - assertThat(result).isNotNull - assertThat(result).hasSize(2) - val firstMessage = result.first() - assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID) - assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT) + result.fold({ fail("should have succeeded") }) { rightResult -> + assertThat(rightResult).hasSize(2) + val firstMessage = rightResult.first() + assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID) + assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT) + + } } } + on("invalid parameters json") { it("should throw exception") { - assertThatExceptionOfType(ParsingException::class.java).isThrownBy { - messageParametersParser.parse(invalidMessagesParametesJson()) - } + val result = messageParametersParser.parse(invalidMessagesParametesJson()) + assertThat(result.isLeft()).describedAs("is left").isTrue() } } } diff --git a/hv-collector-xnf-simulator/pom.xml b/hv-collector-xnf-simulator/pom.xml index d44e2511..cfe1dc14 100644 --- a/hv-collector-xnf-simulator/pom.xml +++ b/hv-collector-xnf-simulator/pom.xml @@ -106,6 +106,10 @@ <artifactId>arrow-effects</artifactId> </dependency> <dependency> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-core</artifactId> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </dependency> diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt deleted file mode 100644 index 02e6ee72..00000000 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ /dev/null @@ -1,102 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.xnf.impl - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser.Companion.INSTANCE -import ratpack.handling.Chain -import ratpack.handling.Context -import ratpack.server.RatpackServer -import ratpack.server.ServerConfig -import reactor.core.scheduler.Schedulers -import javax.json.Json - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since June 2018 - */ -internal class HttpServer(private val vesClient: XnfSimulator, - private val messageParametersParser: MessageParametersParser = INSTANCE) { - - fun start(port: Int): IO<RatpackServer> = IO { - RatpackServer.start { server -> - server.serverConfig(ServerConfig.embedded().port(port)) - .handlers(this::configureHandlers) - } - } - - private fun configureHandlers(chain: Chain) { - chain - .post("simulator/sync") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readArray() } - .map { messageParametersParser.parse(it) } - .map { MessageGenerator.INSTANCE.createMessageFlux(it) } - .map { vesClient.sendIo(it) } - .map { it.unsafeRunSync() } - .onError { handleException(it, ctx) } - .then { sendAcceptedResponse(ctx) } - } - .post("simulator/async") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readArray() } - .map { messageParametersParser.parse(it) } - .map { MessageGenerator.INSTANCE.createMessageFlux(it) } - .map { vesClient.sendRx(it) } - .map { it.subscribeOn(Schedulers.elastic()).subscribe() } - .onError { handleException(it, ctx) } - .then { sendAcceptedResponse(ctx) } - } - .get("healthcheck") { ctx -> - ctx.response.status(STATUS_OK).send() - } - } - - private fun sendAcceptedResponse(ctx: Context) { - ctx.response - .status(STATUS_OK) - .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() - .add("response", "Request accepted") - .build() - .toString()) - } - - private fun handleException(t: Throwable, ctx: Context) { - logger.warn("Failed to process the request - ${t.localizedMessage}") - logger.debug("Exception thrown when processing the request", t) - ctx.response - .status(STATUS_BAD_REQUEST) - .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() - .add("response", "Request was not accepted") - .add("exception", t.localizedMessage) - .build() - .toString()) - } - - companion object { - private val logger = Logger(HttpServer::class) - const val STATUS_OK = 200 - const val STATUS_BAD_REQUEST = 400 - const val CONTENT_TYPE_APPLICATION_JSON = "application/json" - } -} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index e8a474d0..558bd1c1 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -19,98 +19,40 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl +import arrow.core.Either +import arrow.core.Some +import arrow.core.Try +import arrow.core.fix +import arrow.core.flatMap +import arrow.core.monad import arrow.effects.IO -import io.netty.handler.ssl.ClientAuth -import io.netty.handler.ssl.SslContext -import io.netty.handler.ssl.SslContextBuilder -import io.netty.handler.ssl.SslProvider -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.publisher.ReplayProcessor -import reactor.ipc.netty.NettyOutbound -import reactor.ipc.netty.tcp.TcpClient - +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import java.io.InputStream +import javax.json.Json /** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since June 2018 + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 */ -internal class XnfSimulator(private val configuration: SimulatorConfiguration) { - - private val client: TcpClient = TcpClient.builder() - .options { opts -> - opts.host(configuration.vesHost) - .port(configuration.vesPort) - .sslContext(createSslContext(configuration.security)) - } - .build() - - fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> { - sendRx(messages).block() - } - - fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> { - val complete = ReplayProcessor.create<Void>(1) - client - .newHandler { _, output -> handler(complete, messages, output) } - .doOnError { - logger.info("Failed to connect to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") - } - .subscribe { - logger.info("Connected to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") - } - return complete.then() - } - - private fun handler(complete: ReplayProcessor<Void>, - messages: Flux<PayloadWireFrameMessage>, - nettyOutbound: NettyOutbound): Publisher<Void> { - - val allocator = nettyOutbound.alloc() - val encoder = WireFrameEncoder(allocator) - val frames = messages - .map(encoder::encode) - .window(MAX_BATCH_SIZE) - - return nettyOutbound - .logConnectionClosed() - .options { it.flushOnBoundary() } - .sendGroups(frames) - .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) - .then { - logger.info("Messages have been sent") - complete.onComplete() - } - .then() - } - - private fun createSslContext(config: SecurityConfiguration): SslContext = - SslContextBuilder.forClient() - .keyManager(config.cert.toFile(), config.privateKey.toFile()) - .trustManager(config.trustedCert.toFile()) - .sslProvider(SslProvider.OPENSSL) - .clientAuth(ClientAuth.REQUIRE) - .build() - - private fun NettyOutbound.logConnectionClosed(): NettyOutbound { - context().onClose { - logger.info { "Connection to ${context().address()} has been closed" } - } - return this - } - - companion object { - private val logger = Logger(XnfSimulator::class) - private const val MAX_BATCH_SIZE = 128 - private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE - } +class XnfSimulator( + private val vesClient: VesHvClient, + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE, + private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) { + + fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = + Either.monad<ParsingError>().binding { + val json = parseJsonArray(messageParameters).bind() + val parsed = messageParametersParser.parse(json).bind() + val generatedMessages = messageGenerator.createMessageFlux(parsed) + vesClient.sendIo(generatedMessages) + }.fix() + + private fun parseJsonArray(jsonStream: InputStream) = + Try { + Json.createReader(jsonStream).readArray() + }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) } } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt new file mode 100644 index 00000000..22e47d75 --- /dev/null +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -0,0 +1,115 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import io.netty.handler.ssl.ClientAuth +import io.netty.handler.ssl.SslContext +import io.netty.handler.ssl.SslContextBuilder +import io.netty.handler.ssl.SslProvider +import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.publisher.ReplayProcessor +import reactor.ipc.netty.NettyOutbound +import reactor.ipc.netty.tcp.TcpClient + + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +class VesHvClient(private val configuration: SimulatorConfiguration) { + + private val client: TcpClient = TcpClient.builder() + .options { opts -> + opts.host(configuration.vesHost) + .port(configuration.vesPort) + .sslContext(createSslContext(configuration.security)) + } + .build() + + fun sendIo(messages: Flux<PayloadWireFrameMessage>) = + sendRx(messages).then(Mono.just(Unit)).asIo() + + private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> { + val complete = ReplayProcessor.create<Void>(1) + client + .newHandler { _, output -> handler(complete, messages, output) } + .doOnError { + logger.info("Failed to connect to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + .subscribe { + logger.info("Connected to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + return complete.then() + } + + private fun handler(complete: ReplayProcessor<Void>, + messages: Flux<PayloadWireFrameMessage>, + nettyOutbound: NettyOutbound): Publisher<Void> { + + val allocator = nettyOutbound.alloc() + val encoder = WireFrameEncoder(allocator) + val frames = messages + .map(encoder::encode) + .window(MAX_BATCH_SIZE) + + return nettyOutbound + .logConnectionClosed() + .options { it.flushOnBoundary() } + .sendGroups(frames) + .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) + .then { + logger.info("Messages have been sent") + complete.onComplete() + } + .then() + } + + private fun createSslContext(config: SecurityConfiguration): SslContext = + SslContextBuilder.forClient() + .keyManager(config.cert.toFile(), config.privateKey.toFile()) + .trustManager(config.trustedCert.toFile()) + .sslProvider(SslProvider.OPENSSL) + .clientAuth(ClientAuth.REQUIRE) + .build() + + private fun NettyOutbound.logConnectionClosed(): NettyOutbound { + context().onClose { + logger.info { "Connection to ${context().address()} has been closed" } + } + return this + } + + companion object { + private val logger = Logger(VesHvClient::class) + private const val MAX_BATCH_SIZE = 128 + private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE + } +} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt new file mode 100644 index 00000000..54ead6f7 --- /dev/null +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -0,0 +1,95 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import arrow.core.Either +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.utils.http.Content +import org.onap.dcae.collectors.veshv.utils.http.ContentType +import org.onap.dcae.collectors.veshv.utils.http.HttpConstants +import org.onap.dcae.collectors.veshv.utils.http.HttpStatus +import org.onap.dcae.collectors.veshv.utils.http.Response +import org.onap.dcae.collectors.veshv.utils.http.Responses +import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors +import org.onap.dcae.collectors.veshv.utils.http.sendEitherErrorOrResponse +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import ratpack.handling.Chain +import ratpack.handling.Context +import ratpack.http.TypedData +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig +import java.util.* +import javax.json.Json + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +internal class XnfApiServer( + private val xnfSimulator: XnfSimulator, + private val ongoingSimulations: OngoingSimulations) { + + fun start(port: Int): IO<RatpackServer> = IO { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(this::configureHandlers) + } + } + + private fun configureHandlers(chain: Chain) { + chain + .post("simulator", ::startSimulationHandler) + .post("simulator/async", ::startSimulationHandler) + .get("simulator/:id", ::simulatorStatusHandler) + .get("healthcheck") { ctx -> + logger.info("Checking health") + ctx.response.status(HttpConstants.STATUS_OK).send() + } + } + + private fun startSimulationHandler(ctx: Context) { + logger.info("Starting asynchronous scenario") + ctx.request.body.then { body -> + val id = startSimulation(body) + ctx.response.sendEitherErrorOrResponse(id) + } + } + + private fun startSimulation(body: TypedData): Either<ParsingError, Response> { + return xnfSimulator.startSimulation(body.inputStream) + .map(ongoingSimulations::startAsynchronousSimulation) + .map(Responses::acceptedResponse) + } + + private fun simulatorStatusHandler(ctx: Context) { + val id = UUID.fromString(ctx.pathTokens["id"]) + val status = ongoingSimulations.status(id) + val response = Responses.statusResponse(status.toString(), status.message) + ctx.response.sendAndHandleErrors(IO.just(response)) + } + + companion object { + private val logger = Logger(XnfApiServer::class) + } +} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt index 999d0327..56d6212a 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.xnf.config +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config import arrow.core.ForOption import arrow.core.Option diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt index 708ffd13..9b6ef209 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.xnf.config +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration @@ -25,7 +25,7 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -internal data class SimulatorConfiguration( +data class SimulatorConfiguration( val listenPort: Int, val vesHost: String, val vesPort: Int, diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt new file mode 100644 index 00000000..95bb4897 --- /dev/null +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -0,0 +1,76 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl + +import arrow.effects.IO +import kotlinx.coroutines.experimental.asCoroutineDispatcher +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executor +import java.util.concurrent.Executors + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { + private val asyncSimulationContext = executor.asCoroutineDispatcher() + private val simulations = ConcurrentHashMap<UUID, Status>() + + fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID { + val id = UUID.randomUUID() + simulations[id] = StatusOngoing + + simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> + result.fold( + { err -> + logger.warn("Error", err) + simulations[id] = StatusFailure(err) + }, + { + logger.info("Finished sending messages") + simulations[id] = StatusSuccess + } + ) + } + return id + } + + fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound) + + internal fun clear() { + simulations.clear() + } + + companion object { + private val logger = Logger(XnfApiServer::class) + } +} + +sealed class Status(val message: String) { + override fun toString() = this::class.simpleName ?: "null" +} + +object StatusNotFound : Status("not found") +object StatusOngoing : Status("ongoing") +object StatusSuccess : Status("success") +data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}") diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index fa6d626b..c9e900ac 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -19,12 +19,14 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf -import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync -import org.onap.dcae.collectors.veshv.utils.arrow.void +import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -38,11 +40,10 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" */ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map {config -> - XnfSimulator(config) - .let { HttpServer(it) } + .map { config -> + XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations()) .start(config.listenPort) - .void() + .unit() } .unsafeRunEitherSync( { ex -> diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt new file mode 100644 index 00000000..70d8ba83 --- /dev/null +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt @@ -0,0 +1,107 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.effects.IO +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusFailure +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess +import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds +import java.time.Duration +import java.util.* +import java.util.concurrent.Executors + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +internal class OngoingSimulationsTest : Spek({ + val executor = Executors.newSingleThreadExecutor() + val cut = OngoingSimulations(executor) + + describe("simulations repository") { + given("not existing task task id") { + val id = UUID.randomUUID() + + on("status") { + val result = cut.status(id) + + it("should have 'not found' status") { + assertThat(result).isEqualTo(StatusNotFound) + } + } + } + + given("never ending task") { + val task = IO.async<Unit> { } + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have ongoing status") { + assertThat(cut.status(result)).isEqualTo(StatusOngoing) + } + } + } + + given("failing task") { + val cause = RuntimeException("facepalm") + val task = IO.raiseError<Unit>(cause) + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have failing status") { + waitUntilSucceeds { + assertThat(cut.status(result)).isEqualTo(StatusFailure(cause)) + } + } + } + } + + given("successful task") { + val task = IO { println("great success!") } + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have successful status") { + waitUntilSucceeds { + assertThat(cut.status(result)).isEqualTo(StatusSuccess) + } + } + } + } + + afterGroup { + executor.shutdown() + } + } + + afterEachTest { cut.clear() } +}) diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt new file mode 100644 index 00000000..80f39579 --- /dev/null +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -0,0 +1,114 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.core.Left +import arrow.core.None +import arrow.core.Right +import arrow.effects.IO +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.whenever +import com.sun.xml.internal.messaging.saaj.util.ByteInputStream +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import reactor.core.publisher.Flux + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +internal class XnfSimulatorTest : Spek({ + lateinit var cut: XnfSimulator + lateinit var vesClient: VesHvClient + lateinit var messageParametersParser: MessageParametersParser + lateinit var messageGenerator: MessageGenerator + + beforeEachTest { + vesClient = mock() + messageParametersParser = mock() + messageGenerator = mock() + cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator) + } + + describe("startSimulation") { + it("should fail when empty input stream") { + // given + val emptyInputStream = ByteInputStream() + + // when + val result = cut.startSimulation(emptyInputStream) + + // then + assertThat(result).isLeft() + } + + it("should fail when invalid JSON") { + // given + val invalidJson = "invalid json".byteInputStream() + + // when + val result = cut.startSimulation(invalidJson) + + // then + assertThat(result).isLeft() + } + + it("should fail when JSON syntax is valid but content is invalid") { + // given + val json = "[1,2,3]".byteInputStream() + val cause = ParsingError("epic fail", None) + whenever(messageParametersParser.parse(any())).thenReturn( + Left(cause)) + + // when + val result = cut.startSimulation(json) + + // then + assertThat(result).left().isEqualTo(cause) + } + + it("should return generated messages") { + // given + val json = "[true]".byteInputStream() + val messageParams = listOf<MessageParameters>() + val generatedMessages = Flux.empty<PayloadWireFrameMessage>() + val sendingIo = IO {} + whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) + whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) + whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo) + + // when + val result = cut.startSimulation(json) + + // then + assertThat(result).right().isSameAs(sendingIo) + } + } +}) diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt index 8749dc5b..69caf727 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt @@ -26,9 +26,9 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration.DefaultValues -import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration.DefaultValues +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError @@ -64,6 +64,7 @@ <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> <build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version> <jacoco.version>0.8.2</jacoco.version> + <jacoco.minimum.coverage>66</jacoco.minimum.coverage> <!-- Protocol buffers --> <protobuf.version>3.5.1</protobuf.version> @@ -261,6 +262,11 @@ </dependency> </dependencies> </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.6.0</version> + </plugin> </plugins> </pluginManagement> <plugins> @@ -352,6 +358,20 @@ <profiles> <profile> + <id>docker-proxy</id> + <activation> + <property> + <name>docker.http_proxy</name> + </property> + </activation> + <properties> + <!-- set build args as defined in https://dmp.fabric8.io/#build-buildargs --> + <docker.buildArg.http_proxy>${docker.http_proxy}</docker.buildArg.http_proxy> + <docker.buildArg.https_proxy>${docker.http_proxy}</docker.buildArg.https_proxy> + </properties> + </profile> + + <profile> <id>docker</id> <activation> <property> @@ -428,15 +448,9 @@ </name> <registry>${docker-image.registry}</registry> <build> - <!-- - <args> - <http_proxy>${docker.http_proxy}</http_proxy> - <https_proxy>${docker.http_proxy}</https_proxy> - </args> - --> <dockerFileDir>${project.basedir}</dockerFileDir> <tags> - <tag>${project.version}-SNAPSHOT-${maven.build.timestamp}Z</tag> + <tag>${project.version}-${maven.build.timestamp}Z</tag> <tag>${project.version}</tag> <tag>latest</tag> </tags> @@ -514,6 +528,11 @@ <version>${kotlin.version}</version> </dependency> <dependency> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-core</artifactId> + <version>0.25.0</version> + </dependency> + <dependency> <groupId>io.arrow-kt</groupId> <artifactId>arrow-core</artifactId> <version>${arrow.version}</version> @@ -549,6 +568,11 @@ <version>${arrow.version}</version> </dependency> <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects-reactor</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.3.0-alpha4</version> |