summaryrefslogtreecommitdiffstats
path: root/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-dcae-app-simulator/src/main/kotlin/org/onap')
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt74
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt85
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt129
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt (renamed from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt)29
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt (renamed from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt)2
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt (renamed from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt)2
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt (renamed from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt)13
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt14
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt169
9 files changed, 321 insertions, 196 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
new file mode 100644
index 00000000..262e05bf
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.core.getOrElse
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monadError
+import arrow.typeclasses.bindingCatch
+import org.onap.dcae.collectors.veshv.utils.arrow.getOption
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.io.InputStream
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
+ private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) {
+ private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
+
+ fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
+
+ fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch {
+ if (topics.any { it.isBlank() })
+ throw IllegalArgumentException("Topic list cannot contain empty elements")
+ if (topics.isEmpty())
+ throw IllegalArgumentException("Topic list cannot be empty")
+
+ logger.info("Received new configuration. Creating consumer for topics: $topics")
+ consumerState.set(consumerFactory.createConsumerForTopics(topics).bind())
+ }.fix()
+
+ fun state() = consumerState.getOption().map { it.currentState() }
+
+ fun resetState(): IO<Unit> = consumerState.getOption().fold(
+ { IO.unit },
+ { it.reset() }
+ )
+
+ fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages())
+
+ private fun currentMessages(): List<ByteArray> =
+ consumerState.getOption()
+ .map { it.currentState().consumedMessages }
+ .getOrElse(::emptyList)
+
+ private fun extractTopics(topicsString: String): Set<String> =
+ topicsString.substringAfter("=")
+ .split(",")
+ .toSet()
+
+ companion object {
+ private val logger = Logger(DcaeAppSimulator::class)
+ }
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
new file mode 100644
index 00000000..239f7102
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -0,0 +1,85 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monadError
+import arrow.typeclasses.bindingCatch
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventV5
+import java.io.InputStream
+import javax.json.Json
+
+class MessageStreamValidation(
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
+ private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+
+ fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
+ IO.monadError().bindingCatch {
+ val messageParams = parseMessageParams(jsonDescription)
+ val expectedEvents = generateEvents(messageParams).bind()
+ val actualEvents = decodeConsumedEvents(consumedMessages)
+ if (shouldValidatePayloads(messageParams)) {
+ expectedEvents == actualEvents
+ } else {
+ validateHeaders(actualEvents, expectedEvents)
+ }
+ }.fix()
+
+ private fun parseMessageParams(input: InputStream): List<MessageParameters> {
+ val expectations = Json.createReader(input).readArray()
+ val messageParams = messageParametersParser.parse(expectations)
+
+ if (messageParams.isEmpty())
+ throw IllegalArgumentException("Message param list cannot be empty")
+
+ return messageParams
+ }
+
+ private fun shouldValidatePayloads(parameters: List<MessageParameters>) =
+ parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
+
+
+ private fun validateHeaders(actual: List<VesEventV5.VesEvent>, expected: List<VesEventV5.VesEvent>): Boolean {
+ val consumedHeaders = actual.map { it.commonEventHeader }
+ val generatedHeaders = expected.map { it.commonEventHeader }
+ return generatedHeaders == consumedHeaders
+ }
+
+
+ private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventV5.VesEvent>> =
+ messageGenerator.createMessageFlux(parameters)
+ .map(PayloadWireFrameMessage::payload)
+ .map(ByteData::unsafeAsArray)
+ .map(VesEventV5.VesEvent::parseFrom)
+ .collectList()
+ .asIo()
+
+ private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
+ consumedMessages.map(VesEventV5.VesEvent::parseFrom)
+
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt
new file mode 100644
index 00000000..6c830b9d
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt
@@ -0,0 +1,129 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+
+import arrow.core.Left
+import arrow.core.Right
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monad
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import ratpack.exec.Promise
+import ratpack.handling.Chain
+import ratpack.handling.Context
+import ratpack.http.Response
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class ApiServer(private val simulator: DcaeAppSimulator) {
+
+
+ fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
+ simulator.listenToTopics(kafkaTopics).map {
+ RatpackServer.start { server ->
+ server.serverConfig(ServerConfig.embedded().port(port))
+ .handlers(::setupHandlers)
+ }
+ }
+
+ private fun setupHandlers(chain: Chain) {
+ chain
+ .put("configuration/topics") { ctx ->
+ val operation = ctx.bodyIo().flatMap { body ->
+ simulator.listenToTopics(body.text)
+ }
+ ctx.response.sendOrError(operation)
+
+ }
+ .delete("messages") { ctx ->
+ ctx.response.contentType(CONTENT_TEXT)
+ ctx.response.sendOrError(simulator.resetState())
+ }
+ .get("messages/all/count") { ctx ->
+ simulator.state().fold(
+ { ctx.response.status(STATUS_NOT_FOUND) },
+ {
+ ctx.response
+ .contentType(CONTENT_TEXT)
+ .send(it.messagesCount.toString())
+ })
+ }
+ .post("messages/all/validate") { ctx ->
+ val responseStatus = IO.monad().binding {
+ val body = ctx.bodyIo().bind()
+ val isValid = simulator.validate(body.inputStream).bind()
+ if (isValid)
+ STATUS_OK
+ else
+ STATUS_BAD_REQUEST
+ }.fix()
+
+ ctx.response.sendStatusOrError(responseStatus)
+ }
+ .get("healthcheck") { ctx ->
+ ctx.response.status(STATUS_OK).send()
+ }
+ }
+
+ private fun Context.bodyIo() = request.body.asIo()
+
+ private fun <T> Promise<T>.asIo(): IO<T> = IO.async { emitResult ->
+ onError {
+ emitResult(Left(it))
+ }.then { result ->
+ emitResult(Right(result))
+ }
+ }
+
+ private fun Response.sendOrError(responseStatus: IO<Unit>) {
+ sendStatusOrError(responseStatus.map { STATUS_OK })
+ }
+
+ private fun Response.sendStatusOrError(responseStatus: IO<Int>) {
+ responseStatus.unsafeRunAsync { cb ->
+ cb.fold(
+ { err ->
+ logger.warn("Error occurred. Sending HTTP$STATUS_INTERNAL_SERVER_ERROR.", err)
+ status(ApiServer.STATUS_INTERNAL_SERVER_ERROR)
+ .send(CONTENT_TEXT, err.message)
+ },
+ {
+ status(it).send()
+ }
+ )
+ }
+ }
+
+ companion object {
+ private val logger = Logger(ApiServer::class)
+ private const val CONTENT_TEXT = "text/plain"
+
+ private const val STATUS_OK = 200
+ private const val STATUS_BAD_REQUEST = 400
+ private const val STATUS_NOT_FOUND = 404
+ private const val STATUS_INTERNAL_SERVER_ERROR = 500
+ }
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
index d53609ca..15965174 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
@@ -17,15 +17,16 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
import arrow.effects.IO
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer
+import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
-import java.util.*
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,7 +36,7 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
fun start(): IO<Consumer> = IO {
val consumer = Consumer()
- receiver.receive().subscribe(consumer::update)
+ receiver.receive().map(consumer::update).evaluateIo().subscribe()
consumer
}
@@ -43,18 +44,22 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
private val logger = Logger(KafkaSource::class)
fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
- val props = HashMap<String, Any>()
- props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
- props[ConsumerConfig.CLIENT_ID_CONFIG] = "hv-collector-dcae-app-simulator"
- props[ConsumerConfig.GROUP_ID_CONFIG] = "hv-collector-simulators"
- props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
- props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
- props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
- val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props)
+ return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
+ }
+
+ fun createReceiverOptions(bootstrapServers: String, topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
+ val props = mapOf<String, Any>(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
+ ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator",
+ ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
+ )
+ return ReceiverOptions.create<ByteArray, ByteArray>(props)
.addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
.addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
.subscription(topics)
- return KafkaSource(KafkaReceiver.create(receiverOptions))
}
}
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
index 065cdf92..d5f55605 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
import arrow.core.ForOption
import arrow.core.Option
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
index 5bd2d155..c114313d 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
data class DcaeAppSimConfiguration(
val apiPort: Int,
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
index 08bb149f..1eefdbdb 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
@@ -17,9 +17,10 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.kafka.receiver.ReceiverRecord
import java.util.concurrent.ConcurrentLinkedQueue
@@ -28,7 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){
+class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) {
val messagesCount: Int by lazy {
messages.size
}
@@ -53,19 +54,17 @@ class Consumer : ConsumerStateProvider {
consumedMessages.clear()
}
- fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+ fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> {
logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
consumedMessages.add(record.value())
}
-
companion object {
private val logger = Logger(Consumer::class)
}
}
class ConsumerFactory(private val kafkaBootstrapServers: String) {
- fun createConsumerForTopics(kafkaTopics: Set<String>): ConsumerStateProvider {
- return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync()
- }
+ fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> =
+ KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index 9f84fc4d..a65a2686 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -20,10 +20,12 @@
package org.onap.dcae.collectors.veshv.simulators.dcaeapp
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgDcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.ApiServer
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.arrow.void
@@ -50,7 +52,7 @@ fun main(args: Array<String>) =
private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
- return ApiServer(ConsumerFactory(config.kafkaBootstrapServers))
+ return ApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers)))
.start(config.apiPort, config.kafkaTopics)
.void()
-} \ No newline at end of file
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
deleted file mode 100644
index cd258134..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
-import org.onap.ves.VesEventV5.VesEvent
-import ratpack.handling.Chain
-import ratpack.handling.Context
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-import reactor.core.publisher.Mono
-import javax.json.Json
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class ApiServer(private val consumerFactory: ConsumerFactory,
- private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
-
- private lateinit var consumerState: ConsumerStateProvider
-
- fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
- consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
- RatpackServer.start { server ->
- server.serverConfig(ServerConfig.embedded().port(port))
- .handlers(this::setupHandlers)
- }
- }
-
- private fun setupHandlers(chain: Chain) {
- chain
- .put("configuration/topics") { ctx ->
- ctx.request.body.then { it ->
- val topics = extractTopics(it.text)
- logger.info("Received new configuration. Creating consumer for topics: $topics")
- consumerState = consumerFactory.createConsumerForTopics(topics)
- ctx.response
- .status(STATUS_OK)
- .send()
- }
-
- }
- .delete("messages") { ctx ->
- ctx.response.contentType(CONTENT_TEXT)
- consumerState.reset()
- .unsafeRunAsync {
- it.fold(
- { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) },
- { ctx.response.status(STATUS_OK) }
- ).send()
- }
- }
- .get("messages/all/count") { ctx ->
- val state = consumerState.currentState()
- ctx.response
- .contentType(CONTENT_TEXT)
- .send(state.messagesCount.toString())
- }
- .post("messages/all/validate") { ctx ->
- ctx.request.body
- .map { Json.createReader(it.inputStream).readArray() }
- .map { messageParametersParser.parse(it) }
- .map { generateEvents(ctx, it) }
- .then { (generatedEvents, shouldValidatePayloads) ->
- generatedEvents
- .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) }
- .block()
- }
- }
- .get("healthcheck") { ctx ->
- ctx.response.status(STATUS_OK).send()
- }
- }
-
- private fun generateEvents(ctx: Context, parameters: List<MessageParameters>):
- Pair<Mono<List<VesEvent>>, Boolean> = Pair(
-
- doGenerateEvents(parameters).doOnError {
- logger.error("Error occurred when generating messages: $it")
- ctx.response
- .status(STATUS_INTERNAL_SERVER_ERROR)
- .send()
- },
- parameters.all { it.messageType == FIXED_PAYLOAD }
- )
-
- private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE
- .createMessageFlux(parameters)
- .map(PayloadWireFrameMessage::payload)
- .map { decode(it.unsafeAsArray()) }
- .collectList()
-
-
- private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes)
-
-
- private fun sendResponse(ctx: Context,
- generatedEvents: List<VesEvent>,
- shouldValidatePayloads: Boolean) =
- resolveResponseStatusCode(
- generated = generatedEvents,
- consumed = decodeConsumedEvents(),
- validatePayloads = shouldValidatePayloads
- ).let { ctx.response.status(it).send() }
-
-
- private fun decodeConsumedEvents(): List<VesEvent> = consumerState
- .currentState()
- .consumedMessages
- .map(::decode)
-
-
- private fun resolveResponseStatusCode(generated: List<VesEvent>,
- consumed: List<VesEvent>,
- validatePayloads: Boolean): Int =
- if (validatePayloads) {
- if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST
- } else {
- validateHeaders(consumed, generated)
- }
-
- private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int {
- val consumedHeaders = consumed.map { it.commonEventHeader }
- val generatedHeaders = generated.map { it.commonEventHeader }
- return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST
- }
-
- private fun extractTopics(it: String): Set<String> =
- it.substringAfter("=")
- .split(",")
- .toSet()
-
- companion object {
- private val logger = Logger(ApiServer::class)
- private const val CONTENT_TEXT = "text/plain"
-
- private const val STATUS_OK = 200
- private const val STATUS_BAD_REQUEST = 400
- private const val STATUS_INTERNAL_SERVER_ERROR = 500
- }
-}
-
-