summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt2
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt2
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt8
-rw-r--r--sources/hv-collector-ves-message-generator/pom.xml4
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt22
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt8
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/MessageGenerator.kt (renamed from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt)3
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt (renamed from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt)33
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/VesEventGenerator.kt (renamed from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt)4
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt (renamed from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt)6
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt1
-rw-r--r--sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt (renamed from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt)2
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/raw/RawMessageGeneratorTest.kt (renamed from sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt)41
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt3
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt3
-rw-r--r--sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt2
-rw-r--r--sources/hv-collector-xnf-simulator/pom.xml4
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt54
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt49
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt106
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt31
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt51
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt7
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt69
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt64
25 files changed, 351 insertions, 228 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
index 5d9a7cfc..47a2d22a 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -29,7 +29,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.core.publisher.Flux
import java.io.InputStream
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
index 8fb1b2ef..bff7709d 100644
--- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.core.publisher.Flux
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
index 290ef72c..56825221 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
@@ -23,11 +23,10 @@ import arrow.core.Either
import arrow.core.Left
import arrow.core.Right
import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.binding
+import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.core.publisher.toMono
import kotlin.system.exitProcess
/**
@@ -62,6 +61,9 @@ fun <T> Mono<T>.asIo() = IO.async<T> { callback ->
})
}
+fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
+ toMono().then(Mono.fromCallable(callback))
+
fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> =
flatMap { io ->
io.attempt().unsafeRunSync().fold(
diff --git a/sources/hv-collector-ves-message-generator/pom.xml b/sources/hv-collector-ves-message-generator/pom.xml
index 29e32f46..e676dfa9 100644
--- a/sources/hv-collector-ves-message-generator/pom.xml
+++ b/sources/hv-collector-ves-message-generator/pom.xml
@@ -74,6 +74,10 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.onap.dcaegen2.services.sdk</groupId>
+ <artifactId>hvvesclient-producer-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
index 82b79c0c..a7187166 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
@@ -19,20 +19,36 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.api
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_WIRE_FRAME
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion
import org.onap.ves.VesEventOuterClass
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-abstract class MessageParameters(val amount: Long = -1)
+sealed class MessageParameters
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since February 2019
*/
class WireFrameParameters(val messageType: WireFrameType,
- amount: Long = -1) : MessageParameters(amount)
+ val amount: Long = -1) : MessageParameters() {
+
+ val wireFrameVersion: WireFrameVersion
+ get() = ImmutableWireFrameVersion.builder().let {
+ if (messageType == INVALID_WIRE_FRAME)
+ it.major(UNSUPPORTED_MAJOR_VERSION)
+ else
+ it
+ }.build()
+
+ companion object {
+ private const val UNSUPPORTED_MAJOR_VERSION: Short = 2
+ }
+}
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -40,4 +56,4 @@ class WireFrameParameters(val messageType: WireFrameType,
*/
class VesEventParameters(val commonEventHeader: VesEventOuterClass.CommonEventHeader,
val messageType: VesEventType,
- amount: Long = -1) : MessageParameters(amount)
+ val amount: Long = -1) : MessageParameters()
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
index aa473796..613f9bd1 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
@@ -19,9 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.factory
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.PayloadGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe.WireFrameGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.RawMessageGenerator
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -30,5 +30,5 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe.WireF
class MessageGeneratorFactory(private val maxPayloadSizeBytes: Int) {
fun createVesEventGenerator() = VesEventGenerator(PayloadGenerator(), maxPayloadSizeBytes)
- fun createWireFrameGenerator() = WireFrameGenerator()
+ fun createWireFrameGenerator() = RawMessageGenerator()
}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/MessageGenerator.kt
index 5f8638f0..5682cc4c 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/MessageGenerator.kt
@@ -17,8 +17,9 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.api
+package org.onap.dcae.collectors.veshv.ves.message.generator.generators
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt
index ad45bc5c..9f20bd29 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt
@@ -17,12 +17,9 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe
+package org.onap.dcae.collectors.veshv.ves.message.generator.generators
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.PayloadContentType
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import io.netty.buffer.Unpooled
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_GPB_DATA
@@ -30,37 +27,29 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.IN
import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import java.nio.ByteBuffer
import java.nio.charset.Charset
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since February 2019
*/
-class WireFrameGenerator : MessageGenerator<WireFrameParameters, WireFrameMessage>() {
+class RawMessageGenerator : MessageGenerator<WireFrameParameters, ByteBuffer>() {
- override fun createMessageFlux(parameters: WireFrameParameters): Flux<WireFrameMessage> =
+ override fun createMessageFlux(parameters: WireFrameParameters): Flux<ByteBuffer> =
parameters.run {
Mono
.fromCallable { createMessage(messageType) }
.let { repeatMessage(it, amount) }
}
- private fun createMessage(messageType: WireFrameType): WireFrameMessage =
+ private fun createMessage(messageType: WireFrameType): ByteBuffer =
when (messageType) {
- INVALID_WIRE_FRAME -> {
- val payload = ByteData(VesEvent.getDefaultInstance().toByteArray())
- WireFrameMessage(
- payload,
- UNSUPPORTED_VERSION,
- UNSUPPORTED_VERSION,
- PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
- payload.size())
- }
- INVALID_GPB_DATA ->
- WireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
+ INVALID_WIRE_FRAME -> wrap(VesEvent.getDefaultInstance().toByteArray())
+ INVALID_GPB_DATA -> wrap("invalid vesEvent".toByteArray(Charset.defaultCharset()))
}
- companion object {
- private const val UNSUPPORTED_VERSION: Short = 2
- }
+ private fun wrap(bytes: ByteArray) = Unpooled.wrappedBuffer(bytes).nioBuffer()
+
+
}
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/VesEventGenerator.kt
index 7abd6054..a6669e7d 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/VesEventGenerator.kt
@@ -17,15 +17,15 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent
+package org.onap.dcae.collectors.veshv.ves.message.generator.generators
import com.google.protobuf.ByteString
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.TOO_BIG_PAYLOAD
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.core.publisher.Flux
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt
index 05938924..f0ae4607 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import arrow.core.Option
import com.google.protobuf.util.JsonFormat
@@ -29,7 +29,7 @@ import javax.json.JsonObject
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since July 2018
*/
-class CommonEventHeaderParser {
+internal class CommonEventHeaderParser {
fun parse(json: JsonObject): Option<CommonEventHeader> = Option.fromNullable(
CommonEventHeader.newBuilder()
.apply { JsonFormat.parser().merge(json.toString(), this) }
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
index 174a01fd..7d6087c2 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
@@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.Com
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.Companion.isWireFrameType
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.CommonEventHeaderParser
import javax.json.JsonArray
import javax.json.JsonObject
import javax.json.JsonValue
diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
index ed521054..5891e7bc 100644
--- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt
+++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import com.google.protobuf.ByteString
import java.util.*
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/raw/RawMessageGeneratorTest.kt
index f8c84c39..8c8c8357 100644
--- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/raw/RawMessageGeneratorTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2010 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl.raw
import com.google.protobuf.InvalidProtocolBufferException
import org.assertj.core.api.Assertions
@@ -25,13 +25,13 @@ import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.RawMessageGenerator
import org.onap.ves.VesEventOuterClass
import reactor.test.test
-import kotlin.test.assertTrue
+import java.nio.ByteBuffer
+import java.nio.charset.Charset
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -40,7 +40,7 @@ import kotlin.test.assertTrue
object WireFrameGeneratorTest : Spek({
val maxPayloadSizeBytes = 1024
- val cut = WireFrameGenerator()
+ val cut = RawMessageGenerator()
on("message type requesting invalid GPB data ") {
it("should createVesEventGenerator flux of messages with invalid payload") {
@@ -50,32 +50,17 @@ object WireFrameGeneratorTest : Spek({
))
.test()
.assertNext {
- assertTrue(it.validate().isRight())
- assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
- Assertions.assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
- .isThrownBy { extractCommonEventHeader(it.payload) }
- }
- .verifyComplete()
- }
- }
+ val decodedBytes = it.array().toString(Charset.defaultCharset())
+ assertThat(decodedBytes).isEqualTo("invalid vesEvent")
+ assertThat(it.capacity()).isLessThan(maxPayloadSizeBytes)
- on("message type requesting invalid wire frame ") {
- it("should createVesEventGenerator flux of messages with invalid version") {
- cut
- .createMessageFlux(WireFrameParameters(
- WireFrameType.INVALID_WIRE_FRAME, 1
- ))
- .test()
- .assertNext {
- assertTrue(it.validate().isLeft())
- assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
- assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
+ Assertions.assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
+ .isThrownBy { extractCommonEventHeader(it) }
}
.verifyComplete()
}
}
-
})
-fun extractCommonEventHeader(bytes: ByteData): VesEventOuterClass.CommonEventHeader =
- VesEventOuterClass.VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+private fun extractCommonEventHeader(bytes: ByteBuffer): VesEventOuterClass.CommonEventHeader =
+ VesEventOuterClass.VesEvent.parseFrom(bytes).commonEventHeader
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt
index 04222d1e..09635afd 100644
--- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.STATE_CHANGE
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.CommonEventHeaderParser
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import java.io.ByteArrayInputStream
import javax.json.Json
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt
index 2d77bb9f..4558bb1a 100644
--- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
object PayloadGeneratorTest : Spek({
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt
index 2f13c52e..fa99bfb6 100644
--- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt
@@ -28,8 +28,10 @@ import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
import reactor.test.test
/**
diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml
index a8134100..69ca53b2 100644
--- a/sources/hv-collector-xnf-simulator/pom.xml
+++ b/sources/hv-collector-xnf-simulator/pom.xml
@@ -110,10 +110,6 @@
<artifactId>hvvesclient-producer-impl</artifactId>
</dependency>
<dependency>
- <groupId>org.onap.dcaegen2.services.sdk</groupId>
- <artifactId>hvvesclient-producer-api</artifactId>
- </dependency>
- <dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-test-utils</artifactId>
<version>${project.parent.version}</version>
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 4dfdb845..812afe19 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -26,12 +26,15 @@ import arrow.core.fix
import arrow.effects.IO
import arrow.instances.either.monad.monad
import arrow.typeclasses.binding
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.*
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
-import org.onap.ves.VesEventOuterClass.VesEvent
-import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import java.io.InputStream
import javax.json.Json
@@ -42,18 +45,18 @@ import javax.json.JsonArray
* @since August 2018
*/
class XnfSimulator(
- private val vesClient: VesHvClient,
+ private val clientFactory: ClientFactory,
private val generatorFactory: MessageGeneratorFactory,
private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
+ private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() }
+ private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() }
+
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
-
val json = parseJsonArray(messageParameters).bind()
- messageParametersParser.parse(json).bind()
- .toFlux()
- .flatMap(::generateMessages)
- .let { vesClient.sendIo(it) }
+ val parameters = messageParametersParser.parse(json).bind()
+ simulationFrom(parameters)
}.fix()
private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> =
@@ -61,18 +64,23 @@ class XnfSimulator(
.toEither()
.mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
- private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> =
+ private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters
+ .toFlux()
+ .map(::simulate)
+ .then(Mono.just(Unit))
+ .asIo()
+
+ private fun simulate(parameters: MessageParameters): Mono<Unit> =
when (parameters) {
- is VesEventParameters -> generatorFactory
- .createVesEventGenerator()
- .createMessageFlux(parameters)
- .map(::encodeToWireFrame)
- is WireFrameParameters -> generatorFactory
- .createWireFrameGenerator()
- .createMessageFlux(parameters)
- else -> throw IllegalStateException("Invalid parameters type")
+ is VesEventParameters -> {
+ val messages = vesEventGenerator.createMessageFlux(parameters)
+ val client = clientFactory.create()
+ client.sendVesEvents(messages)
+ }
+ is WireFrameParameters -> {
+ val messages = wireFrameGenerator.createMessageFlux(parameters)
+ val client = clientFactory.create(parameters.wireFrameVersion)
+ client.sendRawPayload(messages)
+ }
}
-
- private fun encodeToWireFrame(event: VesEvent): WireFrameMessage =
- WireFrameMessage(event.toByteArray())
}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
new file mode 100644
index 00000000..afc157c4
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
@@ -0,0 +1,49 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import org.onap.dcae.collectors.veshv.utils.arrow.then
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import java.nio.ByteBuffer
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class HvVesClient(private val producer: HvVesProducer) {
+
+ fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> =
+ producer.send(messages)
+ .then { logger.info { "Ves Events have been sent" } }
+
+
+ fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> =
+ producer.sendRaw(messages, PayloadType.UNDEFINED)
+ .then { logger.info { "Raw messages have been sent" } }
+
+ companion object {
+ private val logger = Logger(HvVesClient::class)
+ }
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
deleted file mode 100644
index eba8ed88..00000000
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
-
-import arrow.core.Option
-import arrow.core.getOrElse
-import io.netty.handler.ssl.SslContext
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
-import org.onap.dcae.collectors.veshv.utils.arrow.asIo
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.core.publisher.ReplayProcessor
-import reactor.netty.NettyOutbound
-import reactor.netty.tcp.TcpClient
-import reactor.util.concurrent.Queues.XS_BUFFER_SIZE
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
- */
-class VesHvClient(private val configuration: SimulatorConfiguration) {
-
- private val client: TcpClient = TcpClient.create()
- .addressSupplier { configuration.hvVesAddress }
- .configureSsl()
-
- private fun TcpClient.configureSsl() =
- createSslContext(configuration.security)
- .map { sslContext -> this.secure(sslContext) }
- .getOrElse { this }
-
- fun sendIo(messages: Flux<WireFrameMessage>) =
- sendRx(messages).then(Mono.just(Unit)).asIo()
-
- private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
- val complete = ReplayProcessor.create<Void>(1)
- client
- .handle { _, output -> handler(complete, messages, output) }
- .connect()
- .doOnError {
- logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" }
- }
- .subscribe {
- logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" }
- }
- return complete.then()
- }
-
- private fun handler(complete: ReplayProcessor<Void>,
- messages: Flux<WireFrameMessage>,
- nettyOutbound: NettyOutbound): Publisher<Void> {
-
- val allocator = nettyOutbound.alloc()
- val encoder = WireFrameEncoder(allocator)
- val frames = messages
- .map(encoder::encode)
- .window(XS_BUFFER_SIZE)
-
- return nettyOutbound
- .logConnectionClosed()
- .options { it.flushOnBoundary() }
- .sendGroups(frames)
- .then {
- logger.info { "Messages have been sent" }
- complete.onComplete()
- }
- .then()
- }
-
- private fun createSslContext(config: SecurityConfiguration): Option<SslContext> =
- SslContextFactory().createClientContext(config)
-
- private fun NettyOutbound.logConnectionClosed() =
- withConnection { conn ->
- conn.onDispose {
- logger.info { "Connection to ${conn.address()} has been closed" }
- }
- }
-
- companion object {
- private val logger = Logger(VesHvClient::class)
- }
-}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
new file mode 100644
index 00000000..1db66f11
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
+
+import io.vavr.collection.Set
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.net.InetSocketAddress
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>,
+ val security: SecurityConfiguration)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
new file mode 100644
index 00000000..a91fccd4
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory
+
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+class ClientFactory(configuration: ClientConfiguration) {
+
+ private val partialConfig = ImmutableProducerOptions
+ .builder()
+ .collectorAddresses(configuration.collectorAddresses)
+ .let { producerOptions ->
+ configuration.security.keys.fold(
+ { producerOptions },
+ { producerOptions.securityKeys(it) })
+ }
+
+ fun create(wireFrameVersion: WireFrameVersion): HvVesClient =
+ buildClient(partialConfig.wireFrameVersion(wireFrameVersion))
+
+
+ fun create(): HvVesClient = buildClient(partialConfig)
+
+ private fun buildClient(config: ImmutableProducerOptions.Builder) =
+ HvVesClient(HvVesProducerFactory.create(config.build()))
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index ef627304..366c7e66 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -23,15 +23,17 @@ import arrow.effects.IO
import arrow.effects.fix
import arrow.effects.instances.io.monad.monad
import arrow.typeclasses.binding
+import io.vavr.collection.HashSet
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
@@ -65,8 +67,9 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> =
IO.monad().binding {
logger.info { "Using configuration: $config" }
XnfHealthCheckServer().startServer(config).bind()
+ val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security)
val xnfSimulator = XnfSimulator(
- VesHvClient(config),
+ ClientFactory(clientConfig),
MessageGeneratorFactory(config.maxPayloadSizeBytes)
)
XnfApiServer(xnfSimulator, OngoingSimulations())
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
new file mode 100644
index 00000000..daf30617
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
+import org.onap.ves.VesEventOuterClass
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import java.nio.ByteBuffer
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+internal class HvVesClientTest : Spek({
+ describe("HvVesClient") {
+ val hvVesProducer: HvVesProducer = mock()
+ val cut = HvVesClient(hvVesProducer)
+
+ describe("handling ves events stream") {
+
+ val vesEvents = Flux.empty<VesEventOuterClass.VesEvent>()
+ whenever(hvVesProducer.send(any())).thenReturn(Mono.empty())
+ cut.sendVesEvents(vesEvents)
+
+ it("should perform sending operation") {
+ verify(hvVesProducer).send(vesEvents)
+ }
+ }
+
+ describe("handling raw message stream") {
+
+ val rawMessages = Flux.empty<ByteBuffer>()
+ whenever(hvVesProducer.sendRaw(any(), any())).thenReturn(Mono.empty())
+ cut.sendRawPayload(rawMessages)
+
+ it("should perform sending operation") {
+ verify(hvVesProducer).sendRaw(eq(rawMessages), any())
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 192725b9..123f12ae 100644
--- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -21,18 +21,28 @@ package org.onap.dcae.collectors.veshv.main
import arrow.core.Left
import arrow.core.None
+import arrow.core.Right
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
+import org.onap.ves.VesEventOuterClass
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import java.io.ByteArrayInputStream
/**
@@ -41,15 +51,15 @@ import java.io.ByteArrayInputStream
*/
internal class XnfSimulatorTest : Spek({
lateinit var cut: XnfSimulator
- lateinit var vesClient: VesHvClient
+ lateinit var clientFactory: ClientFactory
lateinit var messageParametersParser: MessageParametersParser
lateinit var generatorFactory: MessageGeneratorFactory
beforeEachTest {
- vesClient = mock()
+ clientFactory = mock()
messageParametersParser = mock()
generatorFactory = mock()
- cut = XnfSimulator(vesClient, generatorFactory, messageParametersParser)
+ cut = XnfSimulator(clientFactory, generatorFactory, messageParametersParser)
}
describe("startSimulation") {
@@ -89,22 +99,34 @@ internal class XnfSimulatorTest : Spek({
assertThat(result).left().isEqualTo(cause)
}
- // TODO uncomment and fix this test after introducing HvVesProducer from onap SDK in XnfSimulator
-// it("should return generated messages") {
-// // given
-// val json = "[true]".byteInputStream()
-// val messageParams = listOf<MessageParameters>()
-// val generatedMessages = Flux.empty<WireFrameMessage>()
-// val sendingIo = IO {}
-// whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
-// whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages)
-// whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo)
-//
-// // when
-// val result = cut.startSimulation(json)
-//
-// // then
-// assertThat(result).right().isSameAs(sendingIo)
-// }
+ it("should return generated ves messages") {
+ // given
+ val vesEventGenerator: VesEventGenerator = mock()
+ val vesClient: HvVesClient = mock()
+
+ val json = "[true]".byteInputStream()
+
+ val vesEventParams = VesEventParameters(
+ CommonEventHeader.getDefaultInstance(),
+ VesEventType.VALID,
+ 1
+ )
+ val messageParams = listOf(vesEventParams)
+
+ val generatedMessages = Flux.empty<VesEventOuterClass.VesEvent>()
+
+
+ whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
+ whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator)
+ whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages)
+ whenever(clientFactory.create()).thenReturn(vesClient)
+ whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit))
+
+ // when
+ cut.startSimulation(json).map { it.unsafeRunSync() }
+
+ // then
+ verify(vesClient).sendVesEvents(generatedMessages)
+ }
}
}) \ No newline at end of file