aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docker-compose.yml6
-rw-r--r--hv-collector-analysis/pom.xml57
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt27
-rwxr-xr-xhv-collector-coverage/check-coverage.sh30
-rw-r--r--hv-collector-coverage/pom.xml24
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt2
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt3
-rw-r--r--hv-collector-dcae-app-simulator/pom.xml12
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt74
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt89
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt101
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt (renamed from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt)29
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt (renamed from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt)2
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt (renamed from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt)2
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt (renamed from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt)13
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt17
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt169
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt83
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt184
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt225
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt54
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt (renamed from hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt)2
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt2
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt2
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt13
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt2
-rw-r--r--hv-collector-main/src/main/resources/logback.xml6
-rw-r--r--hv-collector-test-utils/pom.xml5
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt62
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt (renamed from hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt)38
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/configurations.kt (renamed from hv-collector-test-utils/src/main/kotlin/configurations.kt)0
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt (renamed from hv-collector-test-utils/src/main/kotlin/messages.kt)0
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt (renamed from hv-collector-test-utils/src/main/kotlin/vesEvents.kt)0
-rw-r--r--hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker1
-rw-r--r--hv-collector-utils/pom.xml21
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt21
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt22
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt6
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt11
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt81
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt77
-rw-r--r--hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt142
-rw-r--r--hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOptionTest.kt62
-rw-r--r--hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt101
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt6
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt17
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt21
-rw-r--r--hv-collector-xnf-simulator/pom.xml4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt102
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt122
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt115
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt95
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt (renamed from hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt)2
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt (renamed from hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt)4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt76
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt15
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt107
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt114
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt6
-rw-r--r--pom.xml38
62 files changed, 2232 insertions, 498 deletions
diff --git a/docker-compose.yml b/docker-compose.yml
index 33aedeca..f9f52b4e 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -28,7 +28,7 @@ services:
command: ["-server", "-bootstrap", "-ui-dir", "/ui"]
ves-hv-collector:
- image: nexus3.onap.org:10003/onap/ves-hv-collector:latest
+ image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest
# build:
# context: hv-collector-main
# dockerfile: Dockerfile
@@ -51,7 +51,7 @@ services:
- ./ssl/:/etc/ves-hv/
xnf-simulator:
- image: nexus3.onap.org:10003/onap/ves-hv-collector-xnf-simulator
+ image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator
# build:
# context: hv-collector-xnf-simulator
# dockerfile: Dockerfile
@@ -64,7 +64,7 @@ services:
- ./ssl/:/etc/ves-hv/
dcae-app-simulator:
- image: nexus3.onap.org:10003/onap/ves-hv-collector-dcae-simulator
+ image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator
# build:
# context: hv-collector-dcae-app-simulator
# dockerfile: Dockerfile
diff --git a/hv-collector-analysis/pom.xml b/hv-collector-analysis/pom.xml
index a4d0a738..e9ffcf36 100644
--- a/hv-collector-analysis/pom.xml
+++ b/hv-collector-analysis/pom.xml
@@ -19,32 +19,41 @@
~ ============LICENSE_END=========================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
- <licenses>
- <license>
- <name>The Apache Software License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- </license>
- </licenses>
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
- <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
- <artifactId>hv-collector-analysis</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- <description>VES HighVolume Collector :: Code analysis configuration</description>
+ <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
+ <artifactId>hv-collector-analysis</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <description>VES HighVolume Collector :: Code analysis configuration</description>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-deploy-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.1.0</version>
+ <configuration>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project> \ No newline at end of file
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 6c256b72..3c85a9b1 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
@@ -30,7 +31,7 @@ interface Collector {
fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
}
-typealias CollectorProvider = () -> Collector
+typealias CollectorProvider = () -> Option<Collector>
interface Server {
fun start(): IO<ServerHandle>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index a400ff32..d807a9e7 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -32,6 +32,7 @@ import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.concurrent.atomic.AtomicReference
@@ -57,7 +58,7 @@ class CollectorFactory(val configuration: ConfigurationProvider,
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
- return collector::get
+ return collector::getOption
}
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index f858d959..a34be7cd 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -70,23 +70,34 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
.receive()
.retain()
- return collectorProvider()
- .handleConnection(nettyInbound.context().channel().alloc(), dataStream)
+ return collectorProvider().fold(
+ {
+ logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ Mono.empty()
+ },
+ { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) })
+
}
private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
onReadIdle(timeout.toMillis()) {
- logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
- context().channel().close().addListener {
- if (it.isSuccess)
- logger.debug { "Client disconnected because of idle timeout" }
- else
- logger.warn("Channel close failed", it.cause())
+ logger.info {
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..."
}
+ disconnectClient()
}
return this
}
+ private fun NettyInbound.disconnectClient() {
+ context().channel().close().addListener {
+ if (it.isSuccess)
+ logger.debug { "Channel (${remoteAddress()}) closed successfully." }
+ else
+ logger.warn("Channel close failed", it.cause())
+ }
+ }
+
private fun NettyInbound.logConnectionClosed(): NettyInbound {
context().onClose {
logger.info("Connection from ${remoteAddress()} has been closed")
diff --git a/hv-collector-coverage/check-coverage.sh b/hv-collector-coverage/check-coverage.sh
new file mode 100755
index 00000000..956891ac
--- /dev/null
+++ b/hv-collector-coverage/check-coverage.sh
@@ -0,0 +1,30 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+JACOCO_REPORT="$1"
+MIN_COVERAGE_PERCENT="$2"
+LOG_FILE=target/check-coverage.log
+
+function coverage_from_report() {
+ local xpath_expr="string(/report/counter[@type='INSTRUCTION']/@$1)"
+ xpath -q -e "$xpath_expr" "$JACOCO_REPORT" 2>> ${LOG_FILE}
+}
+
+missed=$(coverage_from_report missed)
+covered=$(coverage_from_report covered)
+total=$(($missed + $covered))
+coverage=$((100 * $covered / $total))
+
+if [[ $(wc -c < ${LOG_FILE}) > 0 ]]; then
+ echo "Warnings from xpath evaluation:"
+ cat ${LOG_FILE}
+ echo
+fi
+
+echo "Coverage: $coverage% (covered/total: $covered/$total)"
+
+if [[ ${coverage} -lt ${MIN_COVERAGE_PERCENT} ]]; then
+ echo "Coverage is too low. Minimum coverage: $MIN_COVERAGE_PERCENT%"
+ exit 1
+fi
+
diff --git a/hv-collector-coverage/pom.xml b/hv-collector-coverage/pom.xml
index f988f8ec..31450918 100644
--- a/hv-collector-coverage/pom.xml
+++ b/hv-collector-coverage/pom.xml
@@ -60,7 +60,7 @@
</goals>
<configuration>
<excludes>
- <!-- Exclute Protobuf-generated classes -->
+ <!-- Exclude Protobuf-generated classes -->
<exclude>org/onap/ves/*</exclude>
</excludes>
<dataFileIncludes>
@@ -71,6 +71,28 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>check-coverage</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <skip>${skipTests}</skip>
+ <executable>${project.basedir}/check-coverage.sh</executable>
+ <workingDirectory>${project.basedir}</workingDirectory>
+ <arguments>
+ <argument>target/site/jacoco-aggregate/jacoco.xml</argument>
+ <argument>${jacoco.minimum.coverage}</argument>
+ </arguments>
+ </configuration>
+ </plugin>
</plugins>
</build>
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 1e22d4c0..ba29844a 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -149,7 +149,7 @@ object PerformanceSpecification : Spek({
val outputDigest = digest.digest()
- assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize)
+ assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
assertThat(outputDigest).isEqualTo(inputDigest)
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index e9b70578..942e6edf 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
+import arrow.core.getOrElse
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
@@ -48,7 +49,7 @@ class Sut(sink: Sink = StoringSink()) {
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
- get() = collectorProvider()
+ get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
}
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml
index 47f71ba6..ce4a2715 100644
--- a/hv-collector-dcae-app-simulator/pom.xml
+++ b/hv-collector-dcae-app-simulator/pom.xml
@@ -19,8 +19,8 @@
~ ============LICENSE_END=========================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<licenses>
@@ -105,6 +105,14 @@
<artifactId>arrow-effects</artifactId>
</dependency>
<dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects-reactor</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-syntax</artifactId>
+ </dependency>
+ <dependency>
<groupId>io.ratpack</groupId>
<artifactId>ratpack-core</artifactId>
</dependency>
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
new file mode 100644
index 00000000..262e05bf
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.core.getOrElse
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monadError
+import arrow.typeclasses.bindingCatch
+import org.onap.dcae.collectors.veshv.utils.arrow.getOption
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.io.InputStream
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
+ private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) {
+ private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
+
+ fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
+
+ fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch {
+ if (topics.any { it.isBlank() })
+ throw IllegalArgumentException("Topic list cannot contain empty elements")
+ if (topics.isEmpty())
+ throw IllegalArgumentException("Topic list cannot be empty")
+
+ logger.info("Received new configuration. Creating consumer for topics: $topics")
+ consumerState.set(consumerFactory.createConsumerForTopics(topics).bind())
+ }.fix()
+
+ fun state() = consumerState.getOption().map { it.currentState() }
+
+ fun resetState(): IO<Unit> = consumerState.getOption().fold(
+ { IO.unit },
+ { it.reset() }
+ )
+
+ fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages())
+
+ private fun currentMessages(): List<ByteArray> =
+ consumerState.getOption()
+ .map { it.currentState().consumedMessages }
+ .getOrElse(::emptyList)
+
+ private fun extractTopics(topicsString: String): Set<String> =
+ topicsString.substringAfter("=")
+ .split(",")
+ .toSet()
+
+ companion object {
+ private val logger = Logger(DcaeAppSimulator::class)
+ }
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
new file mode 100644
index 00000000..354edaeb
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -0,0 +1,89 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monadError
+import arrow.typeclasses.bindingCatch
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventV5
+import java.io.InputStream
+import javax.json.Json
+
+class MessageStreamValidation(
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
+ private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+
+ fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
+ IO.monadError().bindingCatch {
+ val messageParams = parseMessageParams(jsonDescription)
+ val expectedEvents = generateEvents(messageParams).bind()
+ val actualEvents = decodeConsumedEvents(consumedMessages)
+ if (shouldValidatePayloads(messageParams)) {
+ expectedEvents == actualEvents
+ } else {
+ validateHeaders(actualEvents, expectedEvents)
+ }
+ }.fix()
+
+ private fun parseMessageParams(input: InputStream): List<MessageParameters> {
+ val expectations = Json.createReader(input).readArray()
+ val messageParams = messageParametersParser.parse(expectations)
+
+ return messageParams.fold(
+ { throw IllegalArgumentException("Parsing error: " + it.message) },
+ {
+ if (it.isEmpty())
+ throw IllegalArgumentException("Message param list cannot be empty")
+ it
+ }
+ )
+ }
+
+ private fun shouldValidatePayloads(parameters: List<MessageParameters>) =
+ parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
+
+
+ private fun validateHeaders(actual: List<VesEventV5.VesEvent>, expected: List<VesEventV5.VesEvent>): Boolean {
+ val consumedHeaders = actual.map { it.commonEventHeader }
+ val generatedHeaders = expected.map { it.commonEventHeader }
+ return generatedHeaders == consumedHeaders
+ }
+
+
+ private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventV5.VesEvent>> =
+ messageGenerator.createMessageFlux(parameters)
+ .map(PayloadWireFrameMessage::payload)
+ .map(ByteData::unsafeAsArray)
+ .map(VesEventV5.VesEvent::parseFrom)
+ .collectList()
+ .asIo()
+
+ private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
+ consumedMessages.map(VesEventV5.VesEvent::parseFrom)
+
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
new file mode 100644
index 00000000..1eca9317
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
@@ -0,0 +1,101 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendOrError
+import ratpack.handling.Chain
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
+ private val responseValid by lazy {
+ Responses.statusResponse(
+ name = "valid",
+ message = "validation succeeded"
+ )
+ }
+
+ private val responseInvalid by lazy {
+ Responses.statusResponse(
+ name = "invalid",
+ message = "validation failed",
+ httpStatus = HttpStatus.BAD_REQUEST
+ )
+ }
+
+
+ fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
+ simulator.listenToTopics(kafkaTopics).map {
+ RatpackServer.start { server ->
+ server.serverConfig(ServerConfig.embedded().port(port))
+ .handlers(::setupHandlers)
+ }
+ }
+
+ private fun setupHandlers(chain: Chain) {
+ chain
+ .put("configuration/topics") { ctx ->
+ ctx.request.body.then { body ->
+ val operation = simulator.listenToTopics(body.text)
+ ctx.response.sendOrError(operation)
+ }
+
+ }
+ .delete("messages") { ctx ->
+ ctx.response.contentType(CONTENT_TEXT)
+ ctx.response.sendOrError(simulator.resetState())
+ }
+ .get("messages/all/count") { ctx ->
+ simulator.state().fold(
+ { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) },
+ {
+ ctx.response
+ .contentType(CONTENT_TEXT)
+ .send(it.messagesCount.toString())
+ })
+ }
+ .post("messages/all/validate") { ctx ->
+ ctx.request.body.then { body ->
+ val response = simulator.validate(body.inputStream)
+ .map { isValid ->
+ if (isValid) responseValid else responseInvalid
+ }
+ ctx.response.sendAndHandleErrors(response)
+ }
+ }
+ .get("healthcheck") { ctx ->
+ ctx.response.status(HttpConstants.STATUS_OK).send()
+ }
+ }
+
+ companion object {
+ private const val CONTENT_TEXT = "text/plain"
+ }
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
index d53609ca..15965174 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
@@ -17,15 +17,16 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
import arrow.effects.IO
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer
+import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
-import java.util.*
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,7 +36,7 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
fun start(): IO<Consumer> = IO {
val consumer = Consumer()
- receiver.receive().subscribe(consumer::update)
+ receiver.receive().map(consumer::update).evaluateIo().subscribe()
consumer
}
@@ -43,18 +44,22 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
private val logger = Logger(KafkaSource::class)
fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
- val props = HashMap<String, Any>()
- props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
- props[ConsumerConfig.CLIENT_ID_CONFIG] = "hv-collector-dcae-app-simulator"
- props[ConsumerConfig.GROUP_ID_CONFIG] = "hv-collector-simulators"
- props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
- props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
- props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
- val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props)
+ return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
+ }
+
+ fun createReceiverOptions(bootstrapServers: String, topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
+ val props = mapOf<String, Any>(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
+ ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator",
+ ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
+ )
+ return ReceiverOptions.create<ByteArray, ByteArray>(props)
.addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
.addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
.subscription(topics)
- return KafkaSource(KafkaReceiver.create(receiverOptions))
}
}
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
index 065cdf92..d5f55605 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
import arrow.core.ForOption
import arrow.core.Option
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
index 5bd2d155..c114313d 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
data class DcaeAppSimConfiguration(
val apiPort: Int,
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
index 08bb149f..1eefdbdb 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
@@ -17,9 +17,10 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.kafka.receiver.ReceiverRecord
import java.util.concurrent.ConcurrentLinkedQueue
@@ -28,7 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){
+class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) {
val messagesCount: Int by lazy {
messages.size
}
@@ -53,19 +54,17 @@ class Consumer : ConsumerStateProvider {
consumedMessages.clear()
}
- fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+ fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> {
logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
consumedMessages.add(record.value())
}
-
companion object {
private val logger = Logger(Consumer::class)
}
}
class ConsumerFactory(private val kafkaBootstrapServers: String) {
- fun createConsumerForTopics(kafkaTopics: Set<String>): ConsumerStateProvider {
- return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync()
- }
+ fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> =
+ KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index 9f84fc4d..c0f8b340 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -20,13 +20,14 @@
package org.onap.dcae.collectors.veshv.simulators.dcaeapp
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgDcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.void
+import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -50,7 +51,7 @@ fun main(args: Array<String>) =
private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
- return ApiServer(ConsumerFactory(config.kafkaBootstrapServers))
+ return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers)))
.start(config.apiPort, config.kafkaTopics)
- .void()
-} \ No newline at end of file
+ .unit()
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
deleted file mode 100644
index cd258134..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
-import org.onap.ves.VesEventV5.VesEvent
-import ratpack.handling.Chain
-import ratpack.handling.Context
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-import reactor.core.publisher.Mono
-import javax.json.Json
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class ApiServer(private val consumerFactory: ConsumerFactory,
- private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
-
- private lateinit var consumerState: ConsumerStateProvider
-
- fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
- consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
- RatpackServer.start { server ->
- server.serverConfig(ServerConfig.embedded().port(port))
- .handlers(this::setupHandlers)
- }
- }
-
- private fun setupHandlers(chain: Chain) {
- chain
- .put("configuration/topics") { ctx ->
- ctx.request.body.then { it ->
- val topics = extractTopics(it.text)
- logger.info("Received new configuration. Creating consumer for topics: $topics")
- consumerState = consumerFactory.createConsumerForTopics(topics)
- ctx.response
- .status(STATUS_OK)
- .send()
- }
-
- }
- .delete("messages") { ctx ->
- ctx.response.contentType(CONTENT_TEXT)
- consumerState.reset()
- .unsafeRunAsync {
- it.fold(
- { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) },
- { ctx.response.status(STATUS_OK) }
- ).send()
- }
- }
- .get("messages/all/count") { ctx ->
- val state = consumerState.currentState()
- ctx.response
- .contentType(CONTENT_TEXT)
- .send(state.messagesCount.toString())
- }
- .post("messages/all/validate") { ctx ->
- ctx.request.body
- .map { Json.createReader(it.inputStream).readArray() }
- .map { messageParametersParser.parse(it) }
- .map { generateEvents(ctx, it) }
- .then { (generatedEvents, shouldValidatePayloads) ->
- generatedEvents
- .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) }
- .block()
- }
- }
- .get("healthcheck") { ctx ->
- ctx.response.status(STATUS_OK).send()
- }
- }
-
- private fun generateEvents(ctx: Context, parameters: List<MessageParameters>):
- Pair<Mono<List<VesEvent>>, Boolean> = Pair(
-
- doGenerateEvents(parameters).doOnError {
- logger.error("Error occurred when generating messages: $it")
- ctx.response
- .status(STATUS_INTERNAL_SERVER_ERROR)
- .send()
- },
- parameters.all { it.messageType == FIXED_PAYLOAD }
- )
-
- private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE
- .createMessageFlux(parameters)
- .map(PayloadWireFrameMessage::payload)
- .map { decode(it.unsafeAsArray()) }
- .collectList()
-
-
- private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes)
-
-
- private fun sendResponse(ctx: Context,
- generatedEvents: List<VesEvent>,
- shouldValidatePayloads: Boolean) =
- resolveResponseStatusCode(
- generated = generatedEvents,
- consumed = decodeConsumedEvents(),
- validatePayloads = shouldValidatePayloads
- ).let { ctx.response.status(it).send() }
-
-
- private fun decodeConsumedEvents(): List<VesEvent> = consumerState
- .currentState()
- .consumedMessages
- .map(::decode)
-
-
- private fun resolveResponseStatusCode(generated: List<VesEvent>,
- consumed: List<VesEvent>,
- validatePayloads: Boolean): Int =
- if (validatePayloads) {
- if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST
- } else {
- validateHeaders(consumed, generated)
- }
-
- private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int {
- val consumedHeaders = consumed.map { it.commonEventHeader }
- val generatedHeaders = generated.map { it.commonEventHeader }
- return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST
- }
-
- private fun extractTopics(it: String): Set<String> =
- it.substringAfter("=")
- .split(",")
- .toSet()
-
- companion object {
- private val logger = Logger(ApiServer::class)
- private const val CONTENT_TEXT = "text/plain"
-
- private const val STATUS_OK = 200
- private const val STATUS_BAD_REQUEST = 400
- private const val STATUS_INTERNAL_SERVER_ERROR = 500
- }
-}
-
-
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
new file mode 100644
index 00000000..debe9554
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import reactor.kafka.receiver.ReceiverRecord
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+internal class ConsumerTest : Spek({
+
+ lateinit var cut: Consumer
+
+ beforeEachTest {
+ cut = Consumer()
+ }
+
+ describe("Consumer which holds the state of received Kafka records") {
+ it("should contain empty state in the beginning") {
+ assertEmptyState(cut)
+ }
+
+ describe("update") {
+ val value = byteArrayOf(2)
+
+ beforeEachTest {
+ cut.update(receiverRecord(
+ topic = "topic",
+ key = byteArrayOf(1),
+ value = value
+ )).unsafeRunSync()
+ }
+
+ it("should contain one message if it was updated once") {
+ assertState(cut, value)
+ }
+
+ it("should contain empty state message if it was reset after update") {
+ cut.reset().unsafeRunSync()
+ assertEmptyState(cut)
+ }
+ }
+ }
+})
+
+fun assertEmptyState(cut: Consumer) {
+ assertState(cut)
+}
+
+fun assertState(cut: Consumer, vararg values: ByteArray) {
+ assertThat(cut.currentState().consumedMessages)
+ .containsOnly(*values)
+ assertThat(cut.currentState().messagesCount)
+ .isEqualTo(values.size)
+}
+
+fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
+ ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null)
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
new file mode 100644
index 00000000..c0ba5812
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
@@ -0,0 +1,184 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.core.Left
+import arrow.core.None
+import arrow.core.Some
+import arrow.effects.IO
+import com.google.protobuf.ByteString
+import com.nhaarman.mockito_kotlin.any
+import com.nhaarman.mockito_kotlin.eq
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.never
+import com.nhaarman.mockito_kotlin.verify
+import com.nhaarman.mockito_kotlin.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.mockito.ArgumentMatchers.anySet
+import org.mockito.Mockito
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import java.util.concurrent.ConcurrentLinkedQueue
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+internal class DcaeAppSimulatorTest : Spek({
+ lateinit var consumerFactory: ConsumerFactory
+ lateinit var messageStreamValidation: MessageStreamValidation
+ lateinit var consumer: Consumer
+ lateinit var cut: DcaeAppSimulator
+
+ beforeEachTest {
+ consumerFactory = mock()
+ messageStreamValidation = mock()
+ consumer = mock()
+ cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
+
+ whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
+ }
+
+ fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
+
+ describe("listenToTopics") {
+ val topics = setOf("hvMeas", "faults")
+
+ it("should fail when topic list is empty") {
+ val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
+ assertThat(result.isLeft()).isTrue()
+ }
+
+ it("should fail when topic list contains empty strings") {
+ val result = cut.listenToTopics(setOf("hvMeas", " ", "faults")).attempt().unsafeRunSync()
+ assertThat(result.isLeft()).isTrue()
+ }
+
+ it("should subscribe to given topics") {
+ cut.listenToTopics(topics).unsafeRunSync()
+ verify(consumerFactory).createConsumerForTopics(topics)
+ }
+
+ it("should subscribe to given topics when called with comma separated list") {
+ cut.listenToTopics("hvMeas,faults").unsafeRunSync()
+ verify(consumerFactory).createConsumerForTopics(topics)
+ }
+
+ it("should handle errors") {
+ // given
+ val error = RuntimeException("WTF")
+ whenever(consumerFactory.createConsumerForTopics(anySet()))
+ .thenReturn(IO.raiseError(error))
+
+ // when
+ val result = cut.listenToTopics("hvMeas").attempt().unsafeRunSync()
+
+ // then
+ assertThat(result).isEqualTo(Left(error))
+ }
+ }
+
+ describe("state") {
+
+ it("should return None when topics hasn't been initialized") {
+ assertThat(cut.state()).isEqualTo(None)
+ }
+
+ describe("when topics are initialized") {
+ beforeEachTest {
+ cut.listenToTopics("hvMeas").unsafeRunSync()
+ }
+
+ it("should return some state when it has been set") {
+ val state = consumerState()
+ whenever(consumer.currentState()).thenReturn(state)
+
+ assertThat(cut.state()).isEqualTo(Some(state))
+ }
+ }
+ }
+
+ describe("resetState") {
+ it("should do nothing when topics hasn't been initialized") {
+ cut.resetState().unsafeRunSync()
+ verify(consumer, never()).reset()
+ }
+
+ describe("when topics are initialized") {
+ beforeEachTest {
+ cut.listenToTopics("hvMeas").unsafeRunSync()
+ }
+
+ it("should reset the state") {
+ // given
+ whenever(consumer.reset()).thenReturn(IO.unit)
+
+ // when
+ cut.resetState().unsafeRunSync()
+
+ // then
+ verify(consumer).reset()
+ }
+ }
+ }
+
+ describe("validate") {
+ beforeEachTest {
+ whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
+ }
+
+ it("should use empty list when consumer is unavailable") {
+ // when
+ val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
+
+ // then
+ verify(messageStreamValidation).validate(any(), eq(emptyList()))
+ assertThat(result).isTrue()
+ }
+
+ it("should delegate to MessageStreamValidation") {
+ // given
+ cut.listenToTopics("hvMeas").unsafeRunSync()
+ whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
+
+ // when
+ val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
+
+ // then
+ verify(messageStreamValidation).validate(any(), any())
+ assertThat(result).isTrue()
+ }
+ }
+})
+
+
+private const val DUMMY_EVENT_ID = "aaa"
+private const val DUMMY_PAYLOAD = "payload"
+
+private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
+ return VesEvent.newBuilder()
+ .setCommonEventHeader(CommonEventHeader.newBuilder()
+ .setEventId(eventId))
+ .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
+ .build()
+}
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
new file mode 100644
index 00000000..2932367b
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -0,0 +1,225 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.core.Either
+import arrow.core.Left
+import arrow.core.None
+import arrow.core.Right
+import arrow.core.Some
+import arrow.effects.IO
+import javax.json.stream.JsonParsingException
+import com.google.protobuf.ByteString
+import com.nhaarman.mockito_kotlin.any
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.never
+import com.nhaarman.mockito_kotlin.verify
+import com.nhaarman.mockito_kotlin.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.fail
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.mockito.ArgumentMatchers.anyList
+import org.mockito.ArgumentMatchers.anySet
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.core.publisher.Flux
+import java.util.concurrent.ConcurrentLinkedQueue
+import javax.json.Json
+import javax.json.JsonArray
+import javax.json.JsonValue
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+internal class MessageStreamValidationTest : Spek({
+ lateinit var messageParametersParser: MessageParametersParser
+ lateinit var messageGenerator: MessageGenerator
+ lateinit var cut: MessageStreamValidation
+
+ beforeEachTest {
+ messageParametersParser = mock()
+ messageGenerator = mock()
+ cut = MessageStreamValidation(messageParametersParser, messageGenerator)
+ }
+
+ fun givenParsedMessageParameters(vararg params: MessageParameters) {
+ whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
+ }
+
+ describe("validate") {
+
+ it("should return error when JSON is invalid") {
+ // when
+ val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
+
+ // then
+ when(result) {
+ is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
+ else -> fail("validation should fail")
+ }
+ }
+
+ it("should return error when message param list is empty") {
+ // given
+ givenParsedMessageParameters()
+
+ // when
+ val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
+
+ // then
+ assertThat(result.isLeft()).isTrue()
+ }
+
+ describe("when validating headers only") {
+ it("should return true when messages are the same") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val event = vesEvent()
+ val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
+ val receivedMessageBytes = event.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isTrue()
+ }
+
+ it("should return true when messages differ with payload only") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val generatedEvent = vesEvent(payload = "payload A")
+ val receivedEvent = vesEvent(payload = "payload B")
+ val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+ val receivedMessageBytes = receivedEvent.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isTrue()
+ }
+
+ it("should return false when messages are different") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val generatedEvent = vesEvent()
+ val receivedEvent = vesEvent(eventId = "bbb")
+ val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+ val receivedMessageBytes = receivedEvent.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isFalse()
+ }
+ }
+
+ describe("when validating whole messages") {
+ it("should return true when messages are the same") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val event = vesEvent()
+ val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
+ val receivedMessageBytes = event.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isTrue()
+ }
+
+ it("should return false when messages differ with payload only") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val generatedEvent = vesEvent(payload = "payload A")
+ val receivedEvent = vesEvent(payload = "payload B")
+ val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+ val receivedMessageBytes = receivedEvent.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isFalse()
+ }
+
+ it("should return false when messages are different") {
+ // given
+ val jsonAsStream = sampleJsonAsStream()
+ val generatedEvent = vesEvent()
+ val receivedEvent = vesEvent("bbb")
+ val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+ val receivedMessageBytes = receivedEvent.toByteArray()
+
+ givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
+ whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+ // when
+ val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+ // then
+ assertThat(result).isFalse()
+ }
+ }
+ }
+})
+
+
+
+private const val DUMMY_EVENT_ID = "aaa"
+private const val DUMMY_PAYLOAD = "payload"
+
+private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
+ return VesEvent.newBuilder()
+ .setCommonEventHeader(CommonEventHeader.newBuilder()
+ .setEventId(eventId))
+ .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
+ .build()
+}
+
+private const val sampleJsonArray = """["headersOnly"]"""
+
+private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
new file mode 100644
index 00000000..de74f628
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since August 2018
+ */
+internal class KafkaSourceTest : Spek({
+ val servers = "kafka1:9080,kafka2:9080"
+ val topics = setOf("topic1", "topic2")
+
+ describe("receiver options") {
+ val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable()
+
+ fun verifyProperty(key: String, expectedValue: Any) {
+ it("should have $key option set") {
+ assertThat(options.consumerProperty(key))
+ .isEqualTo(expectedValue)
+ }
+ }
+
+ verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers)
+ verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator")
+ verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators")
+ verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
+ verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
+ verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ }
+}) \ No newline at end of file
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt
index 7d887939..e7a22fcf 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index b2e42509..c61ab266 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -31,7 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.R
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class WireFrameEncoder(private val allocator: ByteBufAllocator) {
+class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
fun encode(frame: PayloadWireFrameMessage): ByteBuf {
val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index 89d1f32e..fa63c36e 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -40,7 +40,7 @@ import kotlin.test.fail
*/
object WireFrameCodecsTest : Spek({
val payloadAsString = "coffeebabe"
- val encoder = WireFrameEncoder(UnpooledByteBufAllocator.DEFAULT)
+ val encoder = WireFrameEncoder()
val decoder = WireFrameDecoder()
fun createSampleFrame() =
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
index 79fc9321..1adf0cad 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
@@ -19,16 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.healthcheck.api
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-enum class HealthStatus(val httpResponseStatus: Int) {
- UP(OK),
- DOWN(SERVICE_UNAVAILABLE),
- OUT_OF_SERVICE(SERVICE_UNAVAILABLE),
- UNKNOWN(SERVICE_UNAVAILABLE)
+enum class HealthStatus(val httpResponseStatus: HttpStatus) {
+ UP(HttpStatus.OK),
+ DOWN(HttpStatus.SERVICE_UNAVAILABLE),
+ OUT_OF_SERVICE(HttpStatus.SERVICE_UNAVAILABLE),
+ UNKNOWN(HttpStatus.SERVICE_UNAVAILABLE)
}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
index 7e9efac7..753f73ef 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
@@ -51,7 +51,7 @@ class HealthCheckApiServer(private val healthState: HealthState, private val por
private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
healthDescription.get().run {
- resp.status(status.httpResponseStatus).sendString(Flux.just(status.toString(), "\n", message))
+ resp.status(status.httpResponseStatus.number).sendString(Flux.just(status.toString(), "\n", message))
}
private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml
index 5127e7ef..a0235e17 100644
--- a/hv-collector-main/src/main/resources/logback.xml
+++ b/hv-collector-main/src/main/resources/logback.xml
@@ -27,9 +27,9 @@
</appender>
<logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/>
<!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
<root level="INFO">
diff --git a/hv-collector-test-utils/pom.xml b/hv-collector-test-utils/pom.xml
index 3960e399..3b6c0e89 100644
--- a/hv-collector-test-utils/pom.xml
+++ b/hv-collector-test-utils/pom.xml
@@ -51,5 +51,10 @@
<version>${project.parent.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt
new file mode 100644
index 00000000..54913744
--- /dev/null
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.utils
+
+import arrow.core.Either
+import arrow.core.identity
+import org.assertj.core.api.AbstractAssert
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.ObjectAssert
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+class EitherAssert<A, B>(actual: Either<A, B>)
+ : AbstractAssert<EitherAssert<A, B>, Either<A, B>>(actual, EitherAssert::class.java) {
+
+ fun isLeft(): EitherAssert<A, B> {
+ isNotNull()
+ isInstanceOf(Either.Left::class.java)
+ return myself
+ }
+
+ fun left(): ObjectAssert<A> {
+ isLeft()
+ val left = actual.fold(
+ ::identity,
+ { throw AssertionError("should be left") })
+ return assertThat(left)
+ }
+
+ fun isRight(): EitherAssert<A, B> {
+ isNotNull()
+ isInstanceOf(Either.Right::class.java)
+ return myself
+ }
+
+ fun right(): ObjectAssert<B> {
+ isRight()
+ val right = actual.fold(
+ { throw AssertionError("should be right") },
+ ::identity)
+ return assertThat(right)
+ }
+}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt
index 081dd0da..d017b31b 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt
@@ -17,15 +17,39 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.utils.http
+package org.onap.dcae.collectors.veshv.tests.utils
+
+import arrow.core.Either
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.time.Duration
/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since August 2018
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
*/
-class Status {
- companion object {
- const val OK = 200
- const val SERVICE_UNAVAILABLE = 503
+
+private val logger = Logger("org.onap.dcae.collectors.veshv.tests.utils")
+
+object Assertions : org.assertj.core.api.Assertions() {
+ fun <A,B> assertThat(actual: Either<A, B>) = EitherAssert(actual)
+}
+
+
+fun waitUntilSucceeds(action: () -> Unit) = waitUntilSucceeds(50, Duration.ofMillis(10), action)
+
+fun waitUntilSucceeds(retries: Int, sleepTime: Duration, action: () -> Unit) {
+ var tryNum = 0
+ while (tryNum <= retries) {
+ tryNum++
+ try {
+ logger.debug("Try number $tryNum")
+ action()
+ break
+ } catch (ex: Throwable) {
+ if (tryNum >= retries)
+ throw ex
+ else
+ Thread.sleep(sleepTime.toMillis())
+ }
}
}
diff --git a/hv-collector-test-utils/src/main/kotlin/configurations.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/configurations.kt
index 57843b45..57843b45 100644
--- a/hv-collector-test-utils/src/main/kotlin/configurations.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/configurations.kt
diff --git a/hv-collector-test-utils/src/main/kotlin/messages.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
index c6aa89b2..c6aa89b2 100644
--- a/hv-collector-test-utils/src/main/kotlin/messages.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
diff --git a/hv-collector-test-utils/src/main/kotlin/vesEvents.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
index 6aeb6206..6aeb6206 100644
--- a/hv-collector-test-utils/src/main/kotlin/vesEvents.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
diff --git a/hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker b/hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 00000000..ca6ee9ce
--- /dev/null
+++ b/hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline \ No newline at end of file
diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml
index f1b7f061..81daf9b2 100644
--- a/hv-collector-utils/pom.xml
+++ b/hv-collector-utils/pom.xml
@@ -85,6 +85,20 @@
<artifactId>arrow-syntax</artifactId>
</dependency>
<dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.ratpack</groupId>
+ <artifactId>ratpack-core</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>javax.json</groupId>
+ <artifactId>javax.json-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
@@ -121,7 +135,10 @@
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
-
-
</project> \ No newline at end of file
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
index 39964c1e..a99fef5e 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
@@ -20,12 +20,31 @@
package org.onap.dcae.collectors.veshv.utils.arrow
import arrow.core.Either
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
import arrow.core.identity
+import arrow.syntax.collections.firstOption
+import java.util.*
+import java.util.concurrent.atomic.AtomicReference
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since July 2018
*/
-
fun <A> Either<A, A>.flatten() = fold(::identity, ::identity)
+
+fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity)
+
+fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity)
+
+fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get())
+
+fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> =
+ if (firstValue != null)
+ Option.just(firstValue)
+ else nextValues.asSequence()
+ .map { it() }
+ .filter { it != null }
+ .firstOption()
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
index e37b0d7d..05d13094 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
@@ -20,7 +20,11 @@
package org.onap.dcae.collectors.veshv.utils.arrow
import arrow.core.Either
+import arrow.core.Left
+import arrow.core.Right
import arrow.effects.IO
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import kotlin.system.exitProcess
/**
@@ -46,4 +50,20 @@ fun Either<IO<Unit>, IO<Unit>>.unsafeRunEitherSync(onError: (Throwable) -> ExitC
flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() })
-fun IO<Any>.void() = map { Unit }
+fun IO<Any>.unit() = map { Unit }
+
+fun <T> Mono<T>.asIo() = IO.async<T> { callback ->
+ subscribe({
+ callback(Right(it))
+ }, {
+ callback(Left(it))
+ })
+}
+
+fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> =
+ flatMap { io ->
+ io.attempt().unsafeRunSync().fold(
+ { Flux.error<T>(it) },
+ { Flux.just<T>(it) }
+ )
+ }
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt
index 16634889..1ebe4e48 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt
@@ -27,6 +27,7 @@ import arrow.core.getOrElse
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.CommandLineParser
import org.apache.commons.cli.Options
+import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain
import java.io.File
import java.nio.file.Path
import java.nio.file.Paths
@@ -77,6 +78,7 @@ abstract class ArgBasedConfiguration<T>(private val parser: CommandLineParser) {
protected fun stringPathToPath(path: String): Path = Paths.get(File(path).toURI())
- private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption): Option<String> =
- Option.fromNullable(getOptionValue(cmdLineOpt.option.opt))
+ private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption) = Option.fromNullablesChain(
+ getOptionValue(cmdLineOpt.option.opt),
+ { System.getenv(cmdLineOpt.environmentVariableName()) })
}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
index 836a05df..3a154db2 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
@@ -117,5 +117,14 @@ enum class CommandLineOption(val option: Option) {
.longOpt("dummy")
.desc("If present will start in dummy mode (dummy external services)")
.build()
- ),
+ );
+
+ fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String =
+ option.longOpt.toUpperCase().replace('-', '_').let { mainPart ->
+ "${prefix}_${mainPart}"
+ }
+
+ companion object {
+ private const val DEFAULT_ENV_PREFIX = "VESHV"
+ }
}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt
new file mode 100644
index 00000000..c5c46397
--- /dev/null
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import arrow.typeclasses.Show
+import java.util.*
+import javax.json.Json
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+object HttpConstants {
+ const val STATUS_OK = 200
+ const val STATUS_ACCEPTED = 202
+ const val STATUS_BAD_REQUEST = 400
+ const val STATUS_NOT_FOUND = 404
+ const val STATUS_INTERNAL_SERVER_ERROR = 500
+ const val STATUS_SERVICE_UNAVAILABLE = 503
+
+ const val CONTENT_TYPE_JSON = "application/json"
+ const val CONTENT_TYPE_TEXT = "text/plain"
+}
+
+enum class HttpStatus(val number: Int) {
+ OK(HttpConstants.STATUS_OK),
+ ACCEPTED(HttpConstants.STATUS_ACCEPTED),
+ BAD_REQUEST(HttpConstants.STATUS_BAD_REQUEST),
+ NOT_FOUND(HttpConstants.STATUS_NOT_FOUND),
+ INTERNAL_SERVER_ERROR(HttpConstants.STATUS_INTERNAL_SERVER_ERROR),
+ SERVICE_UNAVAILABLE(HttpConstants.STATUS_SERVICE_UNAVAILABLE)
+}
+
+
+enum class ContentType(val value: String) {
+ JSON(HttpConstants.CONTENT_TYPE_JSON),
+ TEXT(HttpConstants.CONTENT_TYPE_TEXT)
+}
+
+data class Response(val status: HttpStatus, val content: Content<Any>)
+data class Content<T>(val type: ContentType, val value: T, val serializer: Show<T> = Show.any())
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+object Responses {
+
+ fun acceptedResponse(id: UUID): Response {
+ return Response(
+ HttpStatus.ACCEPTED,
+ Content(ContentType.TEXT, id)
+ )
+ }
+
+ fun statusResponse(name: String, message: String, httpStatus: HttpStatus = HttpStatus.OK): Response {
+ return Response(httpStatus,
+ Content(ContentType.JSON,
+ Json.createObjectBuilder()
+ .add("status", name)
+ .add("message", message)
+ .build()))
+ }
+}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
new file mode 100644
index 00000000..0282d0c7
--- /dev/null
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import arrow.core.Either
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import javax.json.Json
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+
+private val logger = Logger("org.onap.dcae.collectors.veshv.utils.arrow.ratpack")
+
+fun ratpack.http.Response.sendOrError(action: IO<Unit>) {
+ sendAndHandleErrors(action.map {
+ Response(
+ HttpStatus.OK,
+ Content(
+ ContentType.JSON,
+ Json.createObjectBuilder().add("response", "Request accepted").build()))
+ })
+}
+
+fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Response>) {
+ when(response) {
+ is Either.Left -> send(errorResponse(response.a.toString()))
+ is Either.Right -> sendAndHandleErrors(IO.just(response.b))
+ }
+}
+
+fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) {
+ response.attempt().unsafeRunSync().fold(
+ { err ->
+ logger.warn("Error occurred. Sending .", err)
+ val message = err.message
+ send(errorResponse(message))
+ },
+ ::send
+ )
+}
+
+private fun errorResponse(message: String?): Response {
+ return Response(
+ HttpStatus.INTERNAL_SERVER_ERROR,
+ Content(
+ ContentType.JSON,
+ Json.createObjectBuilder().add("error", message).build()))
+}
+
+fun ratpack.http.Response.send(response: Response) {
+ val respWithStatus = status(response.status.number)
+ response.content.apply {
+ respWithStatus.send(
+ type.value,
+ serializer.run { value.show() })
+ }
+}
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt
new file mode 100644
index 00000000..29359439
--- /dev/null
+++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt
@@ -0,0 +1,142 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.arrow
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.jetbrains.spek.api.dsl.xdescribe
+import java.util.concurrent.atomic.AtomicReference
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since August 2018
+ */
+internal class CoreKtTest : Spek({
+ describe("AtomicReference.getOption") {
+ given("empty atomic reference") {
+ val atomicReference = AtomicReference<String>()
+
+ on("getOption") {
+ val result = atomicReference.getOption()
+
+ it("should be None") {
+ assertThat(result).isEqualTo(None)
+ }
+ }
+ }
+ given("non-empty atomic reference") {
+ val initialValue = "reksio"
+ val atomicReference = AtomicReference(initialValue)
+
+ on("getOption") {
+ val result = atomicReference.getOption()
+
+ it("should be Some($initialValue)") {
+ assertThat(result).isEqualTo(Some(initialValue))
+ }
+ }
+ }
+ }
+
+ describe("Option.fromNullablesChain") {
+ given("one non-null element") {
+ val just = "some text"
+ on("calling factory") {
+ val result = Option.fromNullablesChain(just)
+
+ it("should return Some($just)") {
+ assertThat(result).isEqualTo(Some(just))
+ }
+ }
+ }
+
+ given("one null element") {
+ val just: String? = null
+ on("calling factory") {
+ val result = Option.fromNullablesChain(just)
+
+ it("should return None") {
+ assertThat(result).isEqualTo(None)
+ }
+ }
+ }
+
+ given("first non-null element") {
+ val first = "some text"
+ val second: String? = null
+ var secondAskedForValue = false
+ on("calling factory") {
+ val result = Option.fromNullablesChain(first, { secondAskedForValue = true; second })
+
+ it("should return Some($first)") {
+ assertThat(result).isEqualTo(Some(first))
+ }
+
+ it("should have not called second provider (should be lazy)") {
+ assertThat(secondAskedForValue).isFalse()
+ }
+ }
+ }
+
+ given("two non-null elements") {
+ val first = "some text"
+ val second = "another text"
+ on("calling factory") {
+ val result = Option.fromNullablesChain(first, { second })
+
+ it("should return Some($first)") {
+ assertThat(result).isEqualTo(Some(first))
+ }
+ }
+ }
+
+ given("two null elements") {
+ val first: String? = null
+ val second: String? = null
+ on("calling factory") {
+ val result = Option.fromNullablesChain(first, { second })
+
+ it("should return None") {
+ assertThat(result).isEqualTo(None)
+ }
+ }
+ }
+
+ given("second non-null element") {
+ val first: String? = null
+ val second = "another text"
+ on("calling factory") {
+ val result = Option.fromNullablesChain(first, { second })
+
+ it("should return Some($second)") {
+ assertThat(result).isEqualTo(Some(second))
+ }
+ }
+ }
+ }
+})
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOptionTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOptionTest.kt
new file mode 100644
index 00000000..f36df043
--- /dev/null
+++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOptionTest.kt
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.commandline
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+class CommandLineOptionTest : Spek({
+ describe("command line options enum") {
+ describe("environment variables") {
+ given("sample option and prefix") {
+ val opt = CommandLineOption.KAFKA_SERVERS
+ val prefix = "CONFIG"
+
+ on("calling environmentVariableName") {
+ val result = opt.environmentVariableName(prefix)
+
+ it("should return prefixed upper snake cased long option name") {
+ assertThat(result).isEqualTo("CONFIG_KAFKA_BOOTSTRAP_SERVERS")
+ }
+ }
+ }
+
+ given("sample option without prefix") {
+ val opt = CommandLineOption.DUMMY_MODE
+
+ on("calling environmentVariableName") {
+ val result = opt.environmentVariableName()
+
+ it("should return prefixed upper snake cased long option name") {
+ assertThat(result).isEqualTo("VESHV_DUMMY")
+ }
+ }
+ }
+ }
+ }
+})
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt
new file mode 100644
index 00000000..f9f716a1
--- /dev/null
+++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt
@@ -0,0 +1,101 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import java.util.*
+import javax.json.JsonObject
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class ResponsesTest : Spek({
+ describe("response factory") {
+ describe("accepted response") {
+ given("uuid") {
+ val uuid = UUID.randomUUID()
+
+ on("calling acceptedResponse") {
+ val result = Responses.acceptedResponse(uuid)
+
+ it ("should have ACCEPTED status") {
+ assertThat(result.status).isEqualTo(HttpStatus.ACCEPTED)
+ }
+
+ it ("should have text body") {
+ assertThat(result.content.type).isEqualTo(ContentType.TEXT)
+ }
+
+ it ("should contain UUID text in the body") {
+ val serialized = result.content.serializer.run { result.content.value.show() }
+ assertThat(serialized).isEqualTo(uuid.toString())
+ }
+ }
+ }
+ }
+ describe("status response") {
+ given("all params are specified") {
+ val status = "ok"
+ val message = "good job"
+ val httpStatus = HttpStatus.OK
+
+ on("calling statusResponse") {
+ val result = Responses.statusResponse(status, message, httpStatus)
+ val json = result.content.value as JsonObject
+
+ it ("should have OK status") {
+ assertThat(result.status).isEqualTo(HttpStatus.OK)
+ }
+
+ it ("should have json body") {
+ assertThat(result.content.type).isEqualTo(ContentType.JSON)
+ }
+
+ it ("should contain status as string") {
+ assertThat(json.getString("status")).isEqualTo(status)
+ }
+
+ it ("should contain message") {
+ assertThat(json.getString("message")).isEqualTo(message)
+ }
+ }
+ }
+
+ given("default params are omitted") {
+ val status = "ok"
+ val message = "good job"
+
+ on("calling statusResponse") {
+ val result = Responses.statusResponse(status, message)
+
+ it ("should have OK status") {
+ assertThat(result.status).isEqualTo(HttpStatus.OK)
+ }
+ }
+ }
+ }
+ }
+})
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
index 060f28a2..754fa31f 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
@@ -19,11 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.api
+import arrow.core.Either
+import arrow.core.Option
import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl
import javax.json.JsonArray
interface MessageParametersParser {
- fun parse(request: JsonArray): List<MessageParameters>
+ fun parse(request: JsonArray): Either<ParsingError, List<MessageParameters>>
companion object {
val INSTANCE: MessageParametersParser by lazy {
@@ -31,3 +33,5 @@ interface MessageParametersParser {
}
}
}
+
+data class ParsingError(val message: String, val cause: Option<Throwable>)
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
index 5b328f1c..f3095618 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
@@ -19,9 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+import arrow.core.Option
+import arrow.core.Try
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
import javax.json.JsonArray
/**
@@ -32,8 +35,8 @@ internal class MessageParametersParserImpl(
private val commonEventHeaderParser: CommonEventHeaderParser = CommonEventHeaderParser()
) : MessageParametersParser {
- override fun parse(request: JsonArray): List<MessageParameters> =
- try {
+ override fun parse(request: JsonArray) =
+ Try {
request
.map { it.asJsonObject() }
.map {
@@ -41,13 +44,13 @@ internal class MessageParametersParserImpl(
.parse(it.getJsonObject("commonEventHeader"))
val messageType = MessageType.valueOf(it.getString("messageType"))
val messagesAmount = it.getJsonNumber("messagesAmount")?.longValue()
- ?: throw ParsingException("\"messagesAmount\" could not be parsed from message.",
- NullPointerException())
+ ?: throw NullPointerException("\"messagesAmount\" could not be parsed from message.")
MessageParameters(commonEventHeader, messageType, messagesAmount)
}
- } catch (e: Exception) {
- throw ParsingException("Parsing request body failed", e)
+ }.toEither().mapLeft { ex ->
+ ParsingError(
+ ex.message ?: "Unable to parse message parameters",
+ Option.fromNullable(ex))
}
- internal class ParsingException(message: String, cause: Exception) : Exception(message, cause)
}
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt
index 92561995..3b1a48b3 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt
@@ -20,13 +20,12 @@
package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.assertj.core.api.Assertions.fail
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl.ParsingException
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl
@@ -45,18 +44,20 @@ object MessageParametersParserTest : Spek({
it("should parse MessagesParameters object successfully") {
val result = messageParametersParser.parse(validMessagesParametesJson())
- assertThat(result).isNotNull
- assertThat(result).hasSize(2)
- val firstMessage = result.first()
- assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID)
- assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT)
+ result.fold({ fail("should have succeeded") }) { rightResult ->
+ assertThat(rightResult).hasSize(2)
+ val firstMessage = rightResult.first()
+ assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID)
+ assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT)
+
+ }
}
}
+
on("invalid parameters json") {
it("should throw exception") {
- assertThatExceptionOfType(ParsingException::class.java).isThrownBy {
- messageParametersParser.parse(invalidMessagesParametesJson())
- }
+ val result = messageParametersParser.parse(invalidMessagesParametesJson())
+ assertThat(result.isLeft()).describedAs("is left").isTrue()
}
}
}
diff --git a/hv-collector-xnf-simulator/pom.xml b/hv-collector-xnf-simulator/pom.xml
index d44e2511..cfe1dc14 100644
--- a/hv-collector-xnf-simulator/pom.xml
+++ b/hv-collector-xnf-simulator/pom.xml
@@ -106,6 +106,10 @@
<artifactId>arrow-effects</artifactId>
</dependency>
<dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
deleted file mode 100644
index 02e6ee72..00000000
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.xnf.impl
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser.Companion.INSTANCE
-import ratpack.handling.Chain
-import ratpack.handling.Context
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-import reactor.core.scheduler.Schedulers
-import javax.json.Json
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
- */
-internal class HttpServer(private val vesClient: XnfSimulator,
- private val messageParametersParser: MessageParametersParser = INSTANCE) {
-
- fun start(port: Int): IO<RatpackServer> = IO {
- RatpackServer.start { server ->
- server.serverConfig(ServerConfig.embedded().port(port))
- .handlers(this::configureHandlers)
- }
- }
-
- private fun configureHandlers(chain: Chain) {
- chain
- .post("simulator/sync") { ctx ->
- ctx.request.body
- .map { Json.createReader(it.inputStream).readArray() }
- .map { messageParametersParser.parse(it) }
- .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
- .map { vesClient.sendIo(it) }
- .map { it.unsafeRunSync() }
- .onError { handleException(it, ctx) }
- .then { sendAcceptedResponse(ctx) }
- }
- .post("simulator/async") { ctx ->
- ctx.request.body
- .map { Json.createReader(it.inputStream).readArray() }
- .map { messageParametersParser.parse(it) }
- .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
- .map { vesClient.sendRx(it) }
- .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
- .onError { handleException(it, ctx) }
- .then { sendAcceptedResponse(ctx) }
- }
- .get("healthcheck") { ctx ->
- ctx.response.status(STATUS_OK).send()
- }
- }
-
- private fun sendAcceptedResponse(ctx: Context) {
- ctx.response
- .status(STATUS_OK)
- .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
- .add("response", "Request accepted")
- .build()
- .toString())
- }
-
- private fun handleException(t: Throwable, ctx: Context) {
- logger.warn("Failed to process the request - ${t.localizedMessage}")
- logger.debug("Exception thrown when processing the request", t)
- ctx.response
- .status(STATUS_BAD_REQUEST)
- .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
- .add("response", "Request was not accepted")
- .add("exception", t.localizedMessage)
- .build()
- .toString())
- }
-
- companion object {
- private val logger = Logger(HttpServer::class)
- const val STATUS_OK = 200
- const val STATUS_BAD_REQUEST = 400
- const val CONTENT_TYPE_APPLICATION_JSON = "application/json"
- }
-}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index e8a474d0..558bd1c1 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -19,98 +19,40 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+import arrow.core.Either
+import arrow.core.Some
+import arrow.core.Try
+import arrow.core.fix
+import arrow.core.flatMap
+import arrow.core.monad
import arrow.effects.IO
-import io.netty.handler.ssl.ClientAuth
-import io.netty.handler.ssl.SslContext
-import io.netty.handler.ssl.SslContextBuilder
-import io.netty.handler.ssl.SslProvider
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.core.publisher.ReplayProcessor
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.tcp.TcpClient
-
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import java.io.InputStream
+import javax.json.Json
/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
*/
-internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
-
- private val client: TcpClient = TcpClient.builder()
- .options { opts ->
- opts.host(configuration.vesHost)
- .port(configuration.vesPort)
- .sslContext(createSslContext(configuration.security))
- }
- .build()
-
- fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> {
- sendRx(messages).block()
- }
-
- fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
- val complete = ReplayProcessor.create<Void>(1)
- client
- .newHandler { _, output -> handler(complete, messages, output) }
- .doOnError {
- logger.info("Failed to connect to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}")
- }
- .subscribe {
- logger.info("Connected to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}")
- }
- return complete.then()
- }
-
- private fun handler(complete: ReplayProcessor<Void>,
- messages: Flux<PayloadWireFrameMessage>,
- nettyOutbound: NettyOutbound): Publisher<Void> {
-
- val allocator = nettyOutbound.alloc()
- val encoder = WireFrameEncoder(allocator)
- val frames = messages
- .map(encoder::encode)
- .window(MAX_BATCH_SIZE)
-
- return nettyOutbound
- .logConnectionClosed()
- .options { it.flushOnBoundary() }
- .sendGroups(frames)
- .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
- .then {
- logger.info("Messages have been sent")
- complete.onComplete()
- }
- .then()
- }
-
- private fun createSslContext(config: SecurityConfiguration): SslContext =
- SslContextBuilder.forClient()
- .keyManager(config.cert.toFile(), config.privateKey.toFile())
- .trustManager(config.trustedCert.toFile())
- .sslProvider(SslProvider.OPENSSL)
- .clientAuth(ClientAuth.REQUIRE)
- .build()
-
- private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
- context().onClose {
- logger.info { "Connection to ${context().address()} has been closed" }
- }
- return this
- }
-
- companion object {
- private val logger = Logger(XnfSimulator::class)
- private const val MAX_BATCH_SIZE = 128
- private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
- }
+class XnfSimulator(
+ private val vesClient: VesHvClient,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
+ private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+
+ fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
+ Either.monad<ParsingError>().binding {
+ val json = parseJsonArray(messageParameters).bind()
+ val parsed = messageParametersParser.parse(json).bind()
+ val generatedMessages = messageGenerator.createMessageFlux(parsed)
+ vesClient.sendIo(generatedMessages)
+ }.fix()
+
+ private fun parseJsonArray(jsonStream: InputStream) =
+ Try {
+ Json.createReader(jsonStream).readArray()
+ }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) }
}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
new file mode 100644
index 00000000..22e47d75
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -0,0 +1,115 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import io.netty.handler.ssl.ClientAuth
+import io.netty.handler.ssl.SslContext
+import io.netty.handler.ssl.SslContextBuilder
+import io.netty.handler.ssl.SslProvider
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.core.publisher.ReplayProcessor
+import reactor.ipc.netty.NettyOutbound
+import reactor.ipc.netty.tcp.TcpClient
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class VesHvClient(private val configuration: SimulatorConfiguration) {
+
+ private val client: TcpClient = TcpClient.builder()
+ .options { opts ->
+ opts.host(configuration.vesHost)
+ .port(configuration.vesPort)
+ .sslContext(createSslContext(configuration.security))
+ }
+ .build()
+
+ fun sendIo(messages: Flux<PayloadWireFrameMessage>) =
+ sendRx(messages).then(Mono.just(Unit)).asIo()
+
+ private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
+ val complete = ReplayProcessor.create<Void>(1)
+ client
+ .newHandler { _, output -> handler(complete, messages, output) }
+ .doOnError {
+ logger.info("Failed to connect to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ .subscribe {
+ logger.info("Connected to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ return complete.then()
+ }
+
+ private fun handler(complete: ReplayProcessor<Void>,
+ messages: Flux<PayloadWireFrameMessage>,
+ nettyOutbound: NettyOutbound): Publisher<Void> {
+
+ val allocator = nettyOutbound.alloc()
+ val encoder = WireFrameEncoder(allocator)
+ val frames = messages
+ .map(encoder::encode)
+ .window(MAX_BATCH_SIZE)
+
+ return nettyOutbound
+ .logConnectionClosed()
+ .options { it.flushOnBoundary() }
+ .sendGroups(frames)
+ .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
+ .then {
+ logger.info("Messages have been sent")
+ complete.onComplete()
+ }
+ .then()
+ }
+
+ private fun createSslContext(config: SecurityConfiguration): SslContext =
+ SslContextBuilder.forClient()
+ .keyManager(config.cert.toFile(), config.privateKey.toFile())
+ .trustManager(config.trustedCert.toFile())
+ .sslProvider(SslProvider.OPENSSL)
+ .clientAuth(ClientAuth.REQUIRE)
+ .build()
+
+ private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
+ context().onClose {
+ logger.info { "Connection to ${context().address()} has been closed" }
+ }
+ return this
+ }
+
+ companion object {
+ private val logger = Logger(VesHvClient::class)
+ private const val MAX_BATCH_SIZE = 128
+ private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
+ }
+}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
new file mode 100644
index 00000000..54ead6f7
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
@@ -0,0 +1,95 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import arrow.core.Either
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.utils.http.Content
+import org.onap.dcae.collectors.veshv.utils.http.ContentType
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
+import org.onap.dcae.collectors.veshv.utils.http.Response
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendEitherErrorOrResponse
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import ratpack.handling.Chain
+import ratpack.handling.Context
+import ratpack.http.TypedData
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+import java.util.*
+import javax.json.Json
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+internal class XnfApiServer(
+ private val xnfSimulator: XnfSimulator,
+ private val ongoingSimulations: OngoingSimulations) {
+
+ fun start(port: Int): IO<RatpackServer> = IO {
+ RatpackServer.start { server ->
+ server.serverConfig(ServerConfig.embedded().port(port))
+ .handlers(this::configureHandlers)
+ }
+ }
+
+ private fun configureHandlers(chain: Chain) {
+ chain
+ .post("simulator", ::startSimulationHandler)
+ .post("simulator/async", ::startSimulationHandler)
+ .get("simulator/:id", ::simulatorStatusHandler)
+ .get("healthcheck") { ctx ->
+ logger.info("Checking health")
+ ctx.response.status(HttpConstants.STATUS_OK).send()
+ }
+ }
+
+ private fun startSimulationHandler(ctx: Context) {
+ logger.info("Starting asynchronous scenario")
+ ctx.request.body.then { body ->
+ val id = startSimulation(body)
+ ctx.response.sendEitherErrorOrResponse(id)
+ }
+ }
+
+ private fun startSimulation(body: TypedData): Either<ParsingError, Response> {
+ return xnfSimulator.startSimulation(body.inputStream)
+ .map(ongoingSimulations::startAsynchronousSimulation)
+ .map(Responses::acceptedResponse)
+ }
+
+ private fun simulatorStatusHandler(ctx: Context) {
+ val id = UUID.fromString(ctx.pathTokens["id"])
+ val status = ongoingSimulations.status(id)
+ val response = Responses.statusResponse(status.toString(), status.message)
+ ctx.response.sendAndHandleErrors(IO.just(response))
+ }
+
+ companion object {
+ private val logger = Logger(XnfApiServer::class)
+ }
+}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
index 999d0327..56d6212a 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.xnf.config
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
import arrow.core.ForOption
import arrow.core.Option
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
index 708ffd13..9b6ef209 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.xnf.config
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
@@ -25,7 +25,7 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-internal data class SimulatorConfiguration(
+data class SimulatorConfiguration(
val listenPort: Int,
val vesHost: String,
val vesPort: Int,
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
new file mode 100644
index 00000000..95bb4897
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
@@ -0,0 +1,76 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+
+import arrow.effects.IO
+import kotlinx.coroutines.experimental.asCoroutineDispatcher
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.Executor
+import java.util.concurrent.Executors
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
+ private val asyncSimulationContext = executor.asCoroutineDispatcher()
+ private val simulations = ConcurrentHashMap<UUID, Status>()
+
+ fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
+ val id = UUID.randomUUID()
+ simulations[id] = StatusOngoing
+
+ simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
+ result.fold(
+ { err ->
+ logger.warn("Error", err)
+ simulations[id] = StatusFailure(err)
+ },
+ {
+ logger.info("Finished sending messages")
+ simulations[id] = StatusSuccess
+ }
+ )
+ }
+ return id
+ }
+
+ fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
+
+ internal fun clear() {
+ simulations.clear()
+ }
+
+ companion object {
+ private val logger = Logger(XnfApiServer::class)
+ }
+}
+
+sealed class Status(val message: String) {
+ override fun toString() = this::class.simpleName ?: "null"
+}
+
+object StatusNotFound : Status("not found")
+object StatusOngoing : Status("ongoing")
+object StatusSuccess : Status("success")
+data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}")
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index fa6d626b..c9e900ac 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -19,12 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.void
+import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -38,11 +40,10 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
*/
fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map {config ->
- XnfSimulator(config)
- .let { HttpServer(it) }
+ .map { config ->
+ XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations())
.start(config.listenPort)
- .void()
+ .unit()
}
.unsafeRunEitherSync(
{ ex ->
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
new file mode 100644
index 00000000..70d8ba83
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.effects.IO
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusFailure
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess
+import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds
+import java.time.Duration
+import java.util.*
+import java.util.concurrent.Executors
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class OngoingSimulationsTest : Spek({
+ val executor = Executors.newSingleThreadExecutor()
+ val cut = OngoingSimulations(executor)
+
+ describe("simulations repository") {
+ given("not existing task task id") {
+ val id = UUID.randomUUID()
+
+ on("status") {
+ val result = cut.status(id)
+
+ it("should have 'not found' status") {
+ assertThat(result).isEqualTo(StatusNotFound)
+ }
+ }
+ }
+
+ given("never ending task") {
+ val task = IO.async<Unit> { }
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have ongoing status") {
+ assertThat(cut.status(result)).isEqualTo(StatusOngoing)
+ }
+ }
+ }
+
+ given("failing task") {
+ val cause = RuntimeException("facepalm")
+ val task = IO.raiseError<Unit>(cause)
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have failing status") {
+ waitUntilSucceeds {
+ assertThat(cut.status(result)).isEqualTo(StatusFailure(cause))
+ }
+ }
+ }
+ }
+
+ given("successful task") {
+ val task = IO { println("great success!") }
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have successful status") {
+ waitUntilSucceeds {
+ assertThat(cut.status(result)).isEqualTo(StatusSuccess)
+ }
+ }
+ }
+ }
+
+ afterGroup {
+ executor.shutdown()
+ }
+ }
+
+ afterEachTest { cut.clear() }
+})
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
new file mode 100644
index 00000000..80f39579
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -0,0 +1,114 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.core.Left
+import arrow.core.None
+import arrow.core.Right
+import arrow.effects.IO
+import com.nhaarman.mockito_kotlin.any
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.whenever
+import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import reactor.core.publisher.Flux
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class XnfSimulatorTest : Spek({
+ lateinit var cut: XnfSimulator
+ lateinit var vesClient: VesHvClient
+ lateinit var messageParametersParser: MessageParametersParser
+ lateinit var messageGenerator: MessageGenerator
+
+ beforeEachTest {
+ vesClient = mock()
+ messageParametersParser = mock()
+ messageGenerator = mock()
+ cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator)
+ }
+
+ describe("startSimulation") {
+ it("should fail when empty input stream") {
+ // given
+ val emptyInputStream = ByteInputStream()
+
+ // when
+ val result = cut.startSimulation(emptyInputStream)
+
+ // then
+ assertThat(result).isLeft()
+ }
+
+ it("should fail when invalid JSON") {
+ // given
+ val invalidJson = "invalid json".byteInputStream()
+
+ // when
+ val result = cut.startSimulation(invalidJson)
+
+ // then
+ assertThat(result).isLeft()
+ }
+
+ it("should fail when JSON syntax is valid but content is invalid") {
+ // given
+ val json = "[1,2,3]".byteInputStream()
+ val cause = ParsingError("epic fail", None)
+ whenever(messageParametersParser.parse(any())).thenReturn(
+ Left(cause))
+
+ // when
+ val result = cut.startSimulation(json)
+
+ // then
+ assertThat(result).left().isEqualTo(cause)
+ }
+
+ it("should return generated messages") {
+ // given
+ val json = "[true]".byteInputStream()
+ val messageParams = listOf<MessageParameters>()
+ val generatedMessages = Flux.empty<PayloadWireFrameMessage>()
+ val sendingIo = IO {}
+ whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
+ whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages)
+ whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo)
+
+ // when
+ val result = cut.startSimulation(json)
+
+ // then
+ assertThat(result).right().isSameAs(sendingIo)
+ }
+ }
+})
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt
index 8749dc5b..69caf727 100644
--- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt
@@ -26,9 +26,9 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration.DefaultValues
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration.DefaultValues
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError
diff --git a/pom.xml b/pom.xml
index 1ed7329d..23fb7a27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,7 @@
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
<jacoco.version>0.8.2</jacoco.version>
+ <jacoco.minimum.coverage>66</jacoco.minimum.coverage>
<!-- Protocol buffers -->
<protobuf.version>3.5.1</protobuf.version>
@@ -261,6 +262,11 @@
</dependency>
</dependencies>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.6.0</version>
+ </plugin>
</plugins>
</pluginManagement>
<plugins>
@@ -352,6 +358,20 @@
<profiles>
<profile>
+ <id>docker-proxy</id>
+ <activation>
+ <property>
+ <name>docker.http_proxy</name>
+ </property>
+ </activation>
+ <properties>
+ <!-- set build args as defined in https://dmp.fabric8.io/#build-buildargs -->
+ <docker.buildArg.http_proxy>${docker.http_proxy}</docker.buildArg.http_proxy>
+ <docker.buildArg.https_proxy>${docker.http_proxy}</docker.buildArg.https_proxy>
+ </properties>
+ </profile>
+
+ <profile>
<id>docker</id>
<activation>
<property>
@@ -428,15 +448,9 @@
</name>
<registry>${docker-image.registry}</registry>
<build>
- <!--
- <args>
- <http_proxy>${docker.http_proxy}</http_proxy>
- <https_proxy>${docker.http_proxy}</https_proxy>
- </args>
- -->
<dockerFileDir>${project.basedir}</dockerFileDir>
<tags>
- <tag>${project.version}-SNAPSHOT-${maven.build.timestamp}Z</tag>
+ <tag>${project.version}-${maven.build.timestamp}Z</tag>
<tag>${project.version}</tag>
<tag>latest</tag>
</tags>
@@ -514,6 +528,11 @@
<version>${kotlin.version}</version>
</dependency>
<dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ <version>0.25.0</version>
+ </dependency>
+ <dependency>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-core</artifactId>
<version>${arrow.version}</version>
@@ -549,6 +568,11 @@
<version>${arrow.version}</version>
</dependency>
<dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects-reactor</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.0-alpha4</version>