diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-12 12:16:19 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 07:36:34 +0200 |
commit | 39ceb737a99abfcff47bd5b3d819d2d3876db519 (patch) | |
tree | 775aeb9f00f86bbd636676f6773ce00ce14b3564 /hv-collector-dcae-app-simulator/src/main/kotlin | |
parent | 9bdb7d9b45ade3b4181c15729fdbb6a53410cebc (diff) |
Basic Ratpack API in DCAE APP Simulator
Closes ONAP-266
Change-Id: Iaa000e976fcdc4274aa88ce7d0a6cd5866987680
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-dcae-app-simulator/src/main/kotlin')
3 files changed, 147 insertions, 4 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt new file mode 100644 index 00000000..b0725d13 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt @@ -0,0 +1,69 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions +import reactor.kafka.receiver.ReceiverRecord +import java.util.* +import java.util.concurrent.atomic.AtomicLong + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { + + private val consumedMessages = AtomicLong(0) + + fun start(): Mono<Void> = Mono.create { sink -> + receiver.doOnConsumer { it.subscription() }.subscribe({ sink.success() }, sink::error) + receiver.receive().subscribe(this::update) + } + + fun consumedMessages() = consumedMessages.get() + + private fun update(record: ReceiverRecord<ByteArray, ByteArray>) { + consumedMessages.incrementAndGet() + } + + companion object { + private val logger = Logger(KafkaSource::class) + + fun create(bootstrapServers: String, topics: Set<String>): KafkaSource { + val props = HashMap<String, Any>() + props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + props[ConsumerConfig.CLIENT_ID_CONFIG] = "hv-collector-dcae-app-simulator" + props[ConsumerConfig.GROUP_ID_CONFIG] = "hv-collector-simulators" + props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props) + .addAssignListener { partitions -> logger.debug { "onPartitionsAssigned $partitions" } } + .addRevokeListener { partitions -> logger.debug { "onPartitionsRevoked $partitions" } } + .subscription(topics) + return KafkaSource(KafkaReceiver.create(receiverOptions)) + } + } +}
\ No newline at end of file diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index a03dbcdb..75c28c48 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -1,9 +1,38 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.slf4j.LoggerFactory -private val logger = LoggerFactory.getLogger("Dcae simulator :: main") +private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main")) -fun main(args : Array<String>){ - logger.info("Hello world!") -}
\ No newline at end of file +fun main(args: Array<String>) { + logger.info("Starting DCAE APP simulator") + val port = 8080 + val messageSource = KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas")) + val apiServer = ApiServer(messageSource) + + messageSource.start() + .then(apiServer.start(port)) + .block() +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt new file mode 100644 index 00000000..3f4e4fc8 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote + +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource +import ratpack.handling.Chain +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class ApiServer(private val messageSource: KafkaSource) { + fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable { + RatpackServer.of { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(this::setupHandlers) + } + }.doOnNext(RatpackServer::start) + + private fun setupHandlers(chain: Chain) { + chain.get("messages/count") { ctx -> + ctx.response.send(messageSource.consumedMessages().toString()) + } + } +} |