diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-12 12:16:19 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 07:36:34 +0200 |
commit | 39ceb737a99abfcff47bd5b3d819d2d3876db519 (patch) | |
tree | 775aeb9f00f86bbd636676f6773ce00ce14b3564 /hv-collector-dcae-app-simulator | |
parent | 9bdb7d9b45ade3b4181c15729fdbb6a53410cebc (diff) |
Basic Ratpack API in DCAE APP Simulator
Closes ONAP-266
Change-Id: Iaa000e976fcdc4274aa88ce7d0a6cd5866987680
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-dcae-app-simulator')
5 files changed, 191 insertions, 4 deletions
diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml index a7123e38..5796f1d2 100644 --- a/hv-collector-dcae-app-simulator/pom.xml +++ b/hv-collector-dcae-app-simulator/pom.xml @@ -70,6 +70,14 @@ <version>${project.parent.version}</version> </dependency> <dependency> + <groupId>io.ratpack</groupId> + <artifactId>ratpack-core</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.kafka</groupId> + <artifactId>reactor-kafka</artifactId> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </dependency> diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt new file mode 100644 index 00000000..b0725d13 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt @@ -0,0 +1,69 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions +import reactor.kafka.receiver.ReceiverRecord +import java.util.* +import java.util.concurrent.atomic.AtomicLong + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { + + private val consumedMessages = AtomicLong(0) + + fun start(): Mono<Void> = Mono.create { sink -> + receiver.doOnConsumer { it.subscription() }.subscribe({ sink.success() }, sink::error) + receiver.receive().subscribe(this::update) + } + + fun consumedMessages() = consumedMessages.get() + + private fun update(record: ReceiverRecord<ByteArray, ByteArray>) { + consumedMessages.incrementAndGet() + } + + companion object { + private val logger = Logger(KafkaSource::class) + + fun create(bootstrapServers: String, topics: Set<String>): KafkaSource { + val props = HashMap<String, Any>() + props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + props[ConsumerConfig.CLIENT_ID_CONFIG] = "hv-collector-dcae-app-simulator" + props[ConsumerConfig.GROUP_ID_CONFIG] = "hv-collector-simulators" + props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props) + .addAssignListener { partitions -> logger.debug { "onPartitionsAssigned $partitions" } } + .addRevokeListener { partitions -> logger.debug { "onPartitionsRevoked $partitions" } } + .subscription(topics) + return KafkaSource(KafkaReceiver.create(receiverOptions)) + } + } +}
\ No newline at end of file diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index a03dbcdb..75c28c48 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -1,9 +1,38 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.slf4j.LoggerFactory -private val logger = LoggerFactory.getLogger("Dcae simulator :: main") +private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main")) -fun main(args : Array<String>){ - logger.info("Hello world!") -}
\ No newline at end of file +fun main(args: Array<String>) { + logger.info("Starting DCAE APP simulator") + val port = 8080 + val messageSource = KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas")) + val apiServer = ApiServer(messageSource) + + messageSource.start() + .then(apiServer.start(port)) + .block() +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt new file mode 100644 index 00000000..3f4e4fc8 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote + +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource +import ratpack.handling.Chain +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class ApiServer(private val messageSource: KafkaSource) { + fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable { + RatpackServer.of { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(this::setupHandlers) + } + }.doOnNext(RatpackServer::start) + + private fun setupHandlers(chain: Chain) { + chain.get("messages/count") { ctx -> + ctx.response.send(messageSource.consumedMessages().toString()) + } + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/hv-collector-dcae-app-simulator/src/main/resources/logback.xml new file mode 100644 index 00000000..48da3b18 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/resources/logback.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> + <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern> + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="INFO"/> + <!--<logger name="reactor.ipc.netty" level="DEBUG"/>--> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file |