diff options
10 files changed, 193 insertions, 88 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index 0f0cca2d..8db767c8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -34,4 +34,12 @@ services: depends_on: - veshvcollector volumes: - - ./ssl/:/etc/ves-hv/
\ No newline at end of file + - ./ssl/:/etc/ves-hv/ + dcae-app-simulator: + build: + context: hv-collector-dcae-app-simulator + dockerfile: Dockerfile + ports: + - "8080:8080/tcp" + depends_on: + - kafka diff --git a/hv-collector-client-simulator/pom.xml b/hv-collector-client-simulator/pom.xml index 012bda53..86cdeca7 100644 --- a/hv-collector-client-simulator/pom.xml +++ b/hv-collector-client-simulator/pom.xml @@ -70,34 +70,8 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <id>copy-internal-deps</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.build.directory}/libs/internal</outputDirectory> - <includeGroupIds>${project.parent.groupId}</includeGroupIds> - <includeScope>runtime</includeScope> - </configuration> - </execution> - <execution> - <id>copy-external-deps</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.build.directory}/libs/external</outputDirectory> - <excludeGroupIds>${project.parent.groupId}</excludeGroupIds> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> </plugin> - <!-- + <!-- TODO: unskip docker <plugin> <groupId>io.fabric8</groupId> <artifactId>docker-maven-plugin</artifactId> diff --git a/hv-collector-dcae-app-simulator/Dockerfile b/hv-collector-dcae-app-simulator/Dockerfile new file mode 100644 index 00000000..8e1d7e83 --- /dev/null +++ b/hv-collector-dcae-app-simulator/Dockerfile @@ -0,0 +1,15 @@ +FROM openjdk:10-jre-slim + +LABEL copyright="Copyright (C) 2018 NOKIA" +LABEL license.name="The Apache Software License, Version 2.0" +LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0" +LABEL maintainer="Nokia Wroclaw ONAP Team" + +EXPOSE 8080 + +WORKDIR /opt/ves-hv-dcae-app-simulator +ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt"] +CMD ["--kafka-bootstrap-servers", "TODO", "--kafka-topics", "TODO", "--api-port", "TODO"] +COPY target/libs/external/* ./ +COPY target/libs/internal/* ./ +COPY target/hv-collector-dcae-app-simulator-*.jar ./ diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml index 5796f1d2..046f5ed0 100644 --- a/hv-collector-dcae-app-simulator/pom.xml +++ b/hv-collector-dcae-app-simulator/pom.xml @@ -57,7 +57,30 @@ </plugin> </plugins> </build> - + <profiles> + <profile> + <id>docker</id> + <activation> + <property> + <name>!skipDocker</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + <!-- TODO: unskip docker + <plugin> + <groupId>io.fabric8</groupId> + <artifactId>docker-maven-plugin</artifactId> + </plugin> + --> + </plugins> + </build> + </profile> + </profiles> <dependencies> <dependency> <groupId>${project.parent.groupId}</groupId> @@ -78,6 +101,10 @@ <artifactId>reactor-kafka</artifactId> </dependency> <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java-util</artifactId> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </dependency> diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt index b0725d13..f7703b86 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt @@ -25,9 +25,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Mono import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions -import reactor.kafka.receiver.ReceiverRecord import java.util.* -import java.util.concurrent.atomic.AtomicLong /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -35,17 +33,10 @@ import java.util.concurrent.atomic.AtomicLong */ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - private val consumedMessages = AtomicLong(0) - - fun start(): Mono<Void> = Mono.create { sink -> - receiver.doOnConsumer { it.subscription() }.subscribe({ sink.success() }, sink::error) - receiver.receive().subscribe(this::update) - } - - fun consumedMessages() = consumedMessages.get() - - private fun update(record: ReceiverRecord<ByteArray, ByteArray>) { - consumedMessages.incrementAndGet() + fun start(): Mono<Consumer> = Mono.create { sink -> + val consumer = Consumer() + receiver.receive().subscribe(consumer::update) + sink.success(consumer) } companion object { @@ -60,10 +51,10 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props) - .addAssignListener { partitions -> logger.debug { "onPartitionsAssigned $partitions" } } - .addRevokeListener { partitions -> logger.debug { "onPartitionsRevoked $partitions" } } + .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } + .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } .subscription(topics) return KafkaSource(KafkaReceiver.create(receiverOptions)) } } -}
\ No newline at end of file +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt new file mode 100644 index 00000000..5f0fe7f6 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka + +import reactor.core.publisher.Mono +import reactor.kafka.receiver.ReceiverRecord + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ + +class ConsumerState(val msgCount: Long, val lastKey: ByteArray?, val lastValue: ByteArray?) + +interface ConsumerStateProvider { + fun currentState(): Mono<ConsumerState> +} + +class Consumer : ConsumerStateProvider { + private var msgCount = 0L + private var lastKey: ByteArray? = null + private var lastValue: ByteArray? = null + + override fun currentState(): Mono<ConsumerState> = Mono.create { sink -> + val state = synchronized(this) { + ConsumerState(msgCount, lastKey, lastValue) + } + sink.success(state) + } + + fun update(record: ReceiverRecord<ByteArray, ByteArray>) { + synchronized(this) { + msgCount++ + lastKey = record.key() + lastValue = record.value() + } + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 75c28c48..170806a1 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -29,10 +29,10 @@ private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main")) fun main(args: Array<String>) { logger.info("Starting DCAE APP simulator") val port = 8080 - val messageSource = KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas")) - val apiServer = ApiServer(messageSource) - messageSource.start() - .then(apiServer.start(port)) + KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas")) + .start() + .map(::ApiServer) + .flatMap { it.start(port) } .block() } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt index 3f4e4fc8..fcb8e131 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt @@ -19,7 +19,9 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource +import com.google.protobuf.util.JsonFormat +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider +import org.onap.ves.VesEventV5.VesEvent import ratpack.handling.Chain import ratpack.server.RatpackServer import ratpack.server.ServerConfig @@ -29,7 +31,9 @@ import reactor.core.publisher.Mono * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class ApiServer(private val messageSource: KafkaSource) { +class ApiServer(private val consumerState: ConsumerStateProvider) { + private val jsonPrinter = JsonFormat.printer() + fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable { RatpackServer.of { server -> server.serverConfig(ServerConfig.embedded().port(port)) @@ -38,8 +42,35 @@ class ApiServer(private val messageSource: KafkaSource) { }.doOnNext(RatpackServer::start) private fun setupHandlers(chain: Chain) { - chain.get("messages/count") { ctx -> - ctx.response.send(messageSource.consumedMessages().toString()) - } + chain + .get("messages/count") { ctx -> + ctx.response.contentType(CONTENT_TEXT) + consumerState.currentState() + .map { it.msgCount.toString() } + .subscribe(ctx.response::send) + } + + .get("messages/last/key") { ctx -> + ctx.response.contentType(CONTENT_JSON) + consumerState.currentState() + .map { it.lastKey } + .map { VesEvent.CommonEventHeader.parseFrom(it) } + .map { jsonPrinter.print(it) } + .subscribe(ctx.response::send) + } + + .get("messages/last/value") { ctx -> + ctx.response.contentType(CONTENT_JSON) + consumerState.currentState() + .map { it.lastValue } + .map { VesEvent.parseFrom(it) } + .map { jsonPrinter.print(it) } + .subscribe(ctx.response::send) + } + } + + companion object { + private const val CONTENT_TEXT = "text/plain" + private const val CONTENT_JSON = "application/json" } } diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml index f7f8eb75..a5a35ba3 100644 --- a/hv-collector-main/pom.xml +++ b/hv-collector-main/pom.xml @@ -71,34 +71,8 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <id>copy-internal-deps</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.build.directory}/libs/internal</outputDirectory> - <includeGroupIds>${project.parent.groupId}</includeGroupIds> - <includeScope>runtime</includeScope> - </configuration> - </execution> - <execution> - <id>copy-external-deps</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.build.directory}/libs/external</outputDirectory> - <excludeGroupIds>${project.parent.groupId}</excludeGroupIds> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> </plugin> - <!-- + <!-- TODO: unskip docker <plugin> <groupId>io.fabric8</groupId> <artifactId>docker-maven-plugin</artifactId> @@ -70,7 +70,7 @@ <skipAnalysis>true</skipAnalysis> <!-- Docker --> - <skipDocker>true</skipDocker> + <skipDocker>true</skipDocker> <!-- TODO: unskip docker --> <docker-image.name>ves-hv-collector/${project.artifactId}</docker-image.name> <docker-image.namespace>onap</docker-image.namespace> </properties> @@ -237,11 +237,6 @@ </dependency> </dependencies> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>3.1.1</version> - </plugin> </plugins> </pluginManagement> <extensions> @@ -412,7 +407,37 @@ </images> </configuration> </plugin> - + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>3.1.1</version> + <executions> + <execution> + <id>copy-internal-deps</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/libs/internal</outputDirectory> + <includeGroupIds>${project.parent.groupId}</includeGroupIds> + <includeScope>runtime</includeScope> + </configuration> + </execution> + <execution> + <id>copy-external-deps</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/libs/external</outputDirectory> + <excludeGroupIds>${project.parent.groupId}</excludeGroupIds> + <includeScope>runtime</includeScope> + </configuration> + </execution> + </executions> + </plugin> </plugins> </pluginManagement> </build> @@ -533,6 +558,11 @@ <version>${protobuf.version}</version> </dependency> <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java-util</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.4</version> |