From 39ceb737a99abfcff47bd5b3d819d2d3876db519 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 12 Jun 2018 12:16:19 +0200 Subject: Basic Ratpack API in DCAE APP Simulator Closes ONAP-266 Change-Id: Iaa000e976fcdc4274aa88ce7d0a6cd5866987680 Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- hv-collector-dcae-app-simulator/pom.xml | 8 +++ .../veshv/simulators/dcaeapp/kafka/KafkaSource.kt | 69 ++++++++++++++++++++++ .../collectors/veshv/simulators/dcaeapp/main.kt | 37 ++++++++++-- .../veshv/simulators/dcaeapp/remote/ApiServer.kt | 45 ++++++++++++++ .../src/main/resources/logback.xml | 36 +++++++++++ 5 files changed, 191 insertions(+), 4 deletions(-) create mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt create mode 100644 hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt create mode 100644 hv-collector-dcae-app-simulator/src/main/resources/logback.xml (limited to 'hv-collector-dcae-app-simulator') diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml index a7123e38..5796f1d2 100644 --- a/hv-collector-dcae-app-simulator/pom.xml +++ b/hv-collector-dcae-app-simulator/pom.xml @@ -69,6 +69,14 @@ hv-collector-utils ${project.parent.version} + + io.ratpack + ratpack-core + + + io.projectreactor.kafka + reactor-kafka + commons-cli commons-cli diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt new file mode 100644 index 00000000..b0725d13 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt @@ -0,0 +1,69 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions +import reactor.kafka.receiver.ReceiverRecord +import java.util.* +import java.util.concurrent.atomic.AtomicLong + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +class KafkaSource(private val receiver: KafkaReceiver) { + + private val consumedMessages = AtomicLong(0) + + fun start(): Mono = Mono.create { sink -> + receiver.doOnConsumer { it.subscription() }.subscribe({ sink.success() }, sink::error) + receiver.receive().subscribe(this::update) + } + + fun consumedMessages() = consumedMessages.get() + + private fun update(record: ReceiverRecord) { + consumedMessages.incrementAndGet() + } + + companion object { + private val logger = Logger(KafkaSource::class) + + fun create(bootstrapServers: String, topics: Set): KafkaSource { + val props = HashMap() + props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + props[ConsumerConfig.CLIENT_ID_CONFIG] = "hv-collector-dcae-app-simulator" + props[ConsumerConfig.GROUP_ID_CONFIG] = "hv-collector-simulators" + props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + val receiverOptions = ReceiverOptions.create(props) + .addAssignListener { partitions -> logger.debug { "onPartitionsAssigned $partitions" } } + .addRevokeListener { partitions -> logger.debug { "onPartitionsRevoked $partitions" } } + .subscription(topics) + return KafkaSource(KafkaReceiver.create(receiverOptions)) + } + } +} \ No newline at end of file diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index a03dbcdb..75c28c48 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -1,9 +1,38 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.slf4j.LoggerFactory -private val logger = LoggerFactory.getLogger("Dcae simulator :: main") +private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main")) -fun main(args : Array){ - logger.info("Hello world!") -} \ No newline at end of file +fun main(args: Array) { + logger.info("Starting DCAE APP simulator") + val port = 8080 + val messageSource = KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas")) + val apiServer = ApiServer(messageSource) + + messageSource.start() + .then(apiServer.start(port)) + .block() +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt new file mode 100644 index 00000000..3f4e4fc8 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote + +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource +import ratpack.handling.Chain +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +class ApiServer(private val messageSource: KafkaSource) { + fun start(port: Int): Mono = Mono.fromCallable { + RatpackServer.of { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(this::setupHandlers) + } + }.doOnNext(RatpackServer::start) + + private fun setupHandlers(chain: Chain) { + chain.get("messages/count") { ctx -> + ctx.response.send(messageSource.consumedMessages().toString()) + } + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/hv-collector-dcae-app-simulator/src/main/resources/logback.xml new file mode 100644 index 00000000..48da3b18 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/resources/logback.xml @@ -0,0 +1,36 @@ + + + + + + + + + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + + + + + + + ${FILE_LOG_PATTERN} + + ${LOG_FILE} + + ${LOG_FILE}.%d{yyyy-MM-dd}.log + 50MB + 30 + 10GB + + + + + + + + + + + \ No newline at end of file -- cgit 1.2.3-korg