aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-dcae-app-simulator/src
diff options
context:
space:
mode:
authorPrzemyslaw Wasala <przemyslaw.wasala@nokia.com>2018-08-02 06:26:50 +0000
committerGerrit Code Review <gerrit@onap.org>2018-08-02 06:26:50 +0000
commite93ba2b32d71844a7075a021631f40a0cc4888df (patch)
tree7b9e947cc5fe65634a3a0bc77e71a39c36e2be33 /hv-collector-dcae-app-simulator/src
parent32aeb329986b3d6b9671c0a9e555cbff43964b3a (diff)
parent94eeb738945da9bda072bd65d59a18a5183d12c6 (diff)
Merge "Dockerize DCAE APP simulator"
Diffstat (limited to 'hv-collector-dcae-app-simulator/src')
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt23
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt55
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt8
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt41
4 files changed, 102 insertions, 25 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
index b0725d13..f7703b86 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
@@ -25,9 +25,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
-import reactor.kafka.receiver.ReceiverRecord
import java.util.*
-import java.util.concurrent.atomic.AtomicLong
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,17 +33,10 @@ import java.util.concurrent.atomic.AtomicLong
*/
class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
- private val consumedMessages = AtomicLong(0)
-
- fun start(): Mono<Void> = Mono.create { sink ->
- receiver.doOnConsumer { it.subscription() }.subscribe({ sink.success() }, sink::error)
- receiver.receive().subscribe(this::update)
- }
-
- fun consumedMessages() = consumedMessages.get()
-
- private fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
- consumedMessages.incrementAndGet()
+ fun start(): Mono<Consumer> = Mono.create { sink ->
+ val consumer = Consumer()
+ receiver.receive().subscribe(consumer::update)
+ sink.success(consumer)
}
companion object {
@@ -60,10 +51,10 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props)
- .addAssignListener { partitions -> logger.debug { "onPartitionsAssigned $partitions" } }
- .addRevokeListener { partitions -> logger.debug { "onPartitionsRevoked $partitions" } }
+ .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
+ .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
.subscription(topics)
return KafkaSource(KafkaReceiver.create(receiverOptions))
}
}
-} \ No newline at end of file
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
new file mode 100644
index 00000000..5f0fe7f6
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
@@ -0,0 +1,55 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+
+import reactor.core.publisher.Mono
+import reactor.kafka.receiver.ReceiverRecord
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+
+class ConsumerState(val msgCount: Long, val lastKey: ByteArray?, val lastValue: ByteArray?)
+
+interface ConsumerStateProvider {
+ fun currentState(): Mono<ConsumerState>
+}
+
+class Consumer : ConsumerStateProvider {
+ private var msgCount = 0L
+ private var lastKey: ByteArray? = null
+ private var lastValue: ByteArray? = null
+
+ override fun currentState(): Mono<ConsumerState> = Mono.create { sink ->
+ val state = synchronized(this) {
+ ConsumerState(msgCount, lastKey, lastValue)
+ }
+ sink.success(state)
+ }
+
+ fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+ synchronized(this) {
+ msgCount++
+ lastKey = record.key()
+ lastValue = record.value()
+ }
+ }
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index 75c28c48..170806a1 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -29,10 +29,10 @@ private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main"))
fun main(args: Array<String>) {
logger.info("Starting DCAE APP simulator")
val port = 8080
- val messageSource = KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas"))
- val apiServer = ApiServer(messageSource)
- messageSource.start()
- .then(apiServer.start(port))
+ KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas"))
+ .start()
+ .map(::ApiServer)
+ .flatMap { it.start(port) }
.block()
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
index 3f4e4fc8..fcb8e131 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
@@ -19,7 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
+import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
+import org.onap.ves.VesEventV5.VesEvent
import ratpack.handling.Chain
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
@@ -29,7 +31,9 @@ import reactor.core.publisher.Mono
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class ApiServer(private val messageSource: KafkaSource) {
+class ApiServer(private val consumerState: ConsumerStateProvider) {
+ private val jsonPrinter = JsonFormat.printer()
+
fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable {
RatpackServer.of { server ->
server.serverConfig(ServerConfig.embedded().port(port))
@@ -38,8 +42,35 @@ class ApiServer(private val messageSource: KafkaSource) {
}.doOnNext(RatpackServer::start)
private fun setupHandlers(chain: Chain) {
- chain.get("messages/count") { ctx ->
- ctx.response.send(messageSource.consumedMessages().toString())
- }
+ chain
+ .get("messages/count") { ctx ->
+ ctx.response.contentType(CONTENT_TEXT)
+ consumerState.currentState()
+ .map { it.msgCount.toString() }
+ .subscribe(ctx.response::send)
+ }
+
+ .get("messages/last/key") { ctx ->
+ ctx.response.contentType(CONTENT_JSON)
+ consumerState.currentState()
+ .map { it.lastKey }
+ .map { VesEvent.CommonEventHeader.parseFrom(it) }
+ .map { jsonPrinter.print(it) }
+ .subscribe(ctx.response::send)
+ }
+
+ .get("messages/last/value") { ctx ->
+ ctx.response.contentType(CONTENT_JSON)
+ consumerState.currentState()
+ .map { it.lastValue }
+ .map { VesEvent.parseFrom(it) }
+ .map { jsonPrinter.print(it) }
+ .subscribe(ctx.response::send)
+ }
+ }
+
+ companion object {
+ private const val CONTENT_TEXT = "text/plain"
+ private const val CONTENT_JSON = "application/json"
}
}