summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docker-compose.yml10
-rw-r--r--hv-collector-client-simulator/pom.xml28
-rw-r--r--hv-collector-dcae-app-simulator/Dockerfile15
-rw-r--r--hv-collector-dcae-app-simulator/pom.xml29
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt23
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt55
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt8
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt41
-rw-r--r--hv-collector-main/pom.xml28
-rw-r--r--pom.xml44
10 files changed, 193 insertions, 88 deletions
diff --git a/docker-compose.yml b/docker-compose.yml
index 0f0cca2d..8db767c8 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -34,4 +34,12 @@ services:
depends_on:
- veshvcollector
volumes:
- - ./ssl/:/etc/ves-hv/ \ No newline at end of file
+ - ./ssl/:/etc/ves-hv/
+ dcae-app-simulator:
+ build:
+ context: hv-collector-dcae-app-simulator
+ dockerfile: Dockerfile
+ ports:
+ - "8080:8080/tcp"
+ depends_on:
+ - kafka
diff --git a/hv-collector-client-simulator/pom.xml b/hv-collector-client-simulator/pom.xml
index 012bda53..86cdeca7 100644
--- a/hv-collector-client-simulator/pom.xml
+++ b/hv-collector-client-simulator/pom.xml
@@ -70,34 +70,8 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-internal-deps</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>${project.build.directory}/libs/internal</outputDirectory>
- <includeGroupIds>${project.parent.groupId}</includeGroupIds>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- <execution>
- <id>copy-external-deps</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>${project.build.directory}/libs/external</outputDirectory>
- <excludeGroupIds>${project.parent.groupId}</excludeGroupIds>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
</plugin>
- <!--
+ <!-- TODO: unskip docker
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
diff --git a/hv-collector-dcae-app-simulator/Dockerfile b/hv-collector-dcae-app-simulator/Dockerfile
new file mode 100644
index 00000000..8e1d7e83
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/Dockerfile
@@ -0,0 +1,15 @@
+FROM openjdk:10-jre-slim
+
+LABEL copyright="Copyright (C) 2018 NOKIA"
+LABEL license.name="The Apache Software License, Version 2.0"
+LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
+LABEL maintainer="Nokia Wroclaw ONAP Team"
+
+EXPOSE 8080
+
+WORKDIR /opt/ves-hv-dcae-app-simulator
+ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt"]
+CMD ["--kafka-bootstrap-servers", "TODO", "--kafka-topics", "TODO", "--api-port", "TODO"]
+COPY target/libs/external/* ./
+COPY target/libs/internal/* ./
+COPY target/hv-collector-dcae-app-simulator-*.jar ./
diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml
index 5796f1d2..046f5ed0 100644
--- a/hv-collector-dcae-app-simulator/pom.xml
+++ b/hv-collector-dcae-app-simulator/pom.xml
@@ -57,7 +57,30 @@
</plugin>
</plugins>
</build>
-
+ <profiles>
+ <profile>
+ <id>docker</id>
+ <activation>
+ <property>
+ <name>!skipDocker</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <!-- TODO: unskip docker
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ </plugin>
+ -->
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
<dependencies>
<dependency>
<groupId>${project.parent.groupId}</groupId>
@@ -78,6 +101,10 @@
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
index b0725d13..f7703b86 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
@@ -25,9 +25,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
-import reactor.kafka.receiver.ReceiverRecord
import java.util.*
-import java.util.concurrent.atomic.AtomicLong
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,17 +33,10 @@ import java.util.concurrent.atomic.AtomicLong
*/
class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
- private val consumedMessages = AtomicLong(0)
-
- fun start(): Mono<Void> = Mono.create { sink ->
- receiver.doOnConsumer { it.subscription() }.subscribe({ sink.success() }, sink::error)
- receiver.receive().subscribe(this::update)
- }
-
- fun consumedMessages() = consumedMessages.get()
-
- private fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
- consumedMessages.incrementAndGet()
+ fun start(): Mono<Consumer> = Mono.create { sink ->
+ val consumer = Consumer()
+ receiver.receive().subscribe(consumer::update)
+ sink.success(consumer)
}
companion object {
@@ -60,10 +51,10 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props)
- .addAssignListener { partitions -> logger.debug { "onPartitionsAssigned $partitions" } }
- .addRevokeListener { partitions -> logger.debug { "onPartitionsRevoked $partitions" } }
+ .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
+ .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
.subscription(topics)
return KafkaSource(KafkaReceiver.create(receiverOptions))
}
}
-} \ No newline at end of file
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
new file mode 100644
index 00000000..5f0fe7f6
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
@@ -0,0 +1,55 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+
+import reactor.core.publisher.Mono
+import reactor.kafka.receiver.ReceiverRecord
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+
+class ConsumerState(val msgCount: Long, val lastKey: ByteArray?, val lastValue: ByteArray?)
+
+interface ConsumerStateProvider {
+ fun currentState(): Mono<ConsumerState>
+}
+
+class Consumer : ConsumerStateProvider {
+ private var msgCount = 0L
+ private var lastKey: ByteArray? = null
+ private var lastValue: ByteArray? = null
+
+ override fun currentState(): Mono<ConsumerState> = Mono.create { sink ->
+ val state = synchronized(this) {
+ ConsumerState(msgCount, lastKey, lastValue)
+ }
+ sink.success(state)
+ }
+
+ fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+ synchronized(this) {
+ msgCount++
+ lastKey = record.key()
+ lastValue = record.value()
+ }
+ }
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index 75c28c48..170806a1 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -29,10 +29,10 @@ private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main"))
fun main(args: Array<String>) {
logger.info("Starting DCAE APP simulator")
val port = 8080
- val messageSource = KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas"))
- val apiServer = ApiServer(messageSource)
- messageSource.start()
- .then(apiServer.start(port))
+ KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas"))
+ .start()
+ .map(::ApiServer)
+ .flatMap { it.start(port) }
.block()
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
index 3f4e4fc8..fcb8e131 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
@@ -19,7 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
+import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
+import org.onap.ves.VesEventV5.VesEvent
import ratpack.handling.Chain
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
@@ -29,7 +31,9 @@ import reactor.core.publisher.Mono
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class ApiServer(private val messageSource: KafkaSource) {
+class ApiServer(private val consumerState: ConsumerStateProvider) {
+ private val jsonPrinter = JsonFormat.printer()
+
fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable {
RatpackServer.of { server ->
server.serverConfig(ServerConfig.embedded().port(port))
@@ -38,8 +42,35 @@ class ApiServer(private val messageSource: KafkaSource) {
}.doOnNext(RatpackServer::start)
private fun setupHandlers(chain: Chain) {
- chain.get("messages/count") { ctx ->
- ctx.response.send(messageSource.consumedMessages().toString())
- }
+ chain
+ .get("messages/count") { ctx ->
+ ctx.response.contentType(CONTENT_TEXT)
+ consumerState.currentState()
+ .map { it.msgCount.toString() }
+ .subscribe(ctx.response::send)
+ }
+
+ .get("messages/last/key") { ctx ->
+ ctx.response.contentType(CONTENT_JSON)
+ consumerState.currentState()
+ .map { it.lastKey }
+ .map { VesEvent.CommonEventHeader.parseFrom(it) }
+ .map { jsonPrinter.print(it) }
+ .subscribe(ctx.response::send)
+ }
+
+ .get("messages/last/value") { ctx ->
+ ctx.response.contentType(CONTENT_JSON)
+ consumerState.currentState()
+ .map { it.lastValue }
+ .map { VesEvent.parseFrom(it) }
+ .map { jsonPrinter.print(it) }
+ .subscribe(ctx.response::send)
+ }
+ }
+
+ companion object {
+ private const val CONTENT_TEXT = "text/plain"
+ private const val CONTENT_JSON = "application/json"
}
}
diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml
index f7f8eb75..a5a35ba3 100644
--- a/hv-collector-main/pom.xml
+++ b/hv-collector-main/pom.xml
@@ -71,34 +71,8 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-internal-deps</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>${project.build.directory}/libs/internal</outputDirectory>
- <includeGroupIds>${project.parent.groupId}</includeGroupIds>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- <execution>
- <id>copy-external-deps</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>${project.build.directory}/libs/external</outputDirectory>
- <excludeGroupIds>${project.parent.groupId}</excludeGroupIds>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
</plugin>
- <!--
+ <!-- TODO: unskip docker
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
diff --git a/pom.xml b/pom.xml
index af61407b..9e33ec56 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
<skipAnalysis>true</skipAnalysis>
<!-- Docker -->
- <skipDocker>true</skipDocker>
+ <skipDocker>true</skipDocker> <!-- TODO: unskip docker -->
<docker-image.name>ves-hv-collector/${project.artifactId}</docker-image.name>
<docker-image.namespace>onap</docker-image.namespace>
</properties>
@@ -237,11 +237,6 @@
</dependency>
</dependencies>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>3.1.1</version>
- </plugin>
</plugins>
</pluginManagement>
<extensions>
@@ -412,7 +407,37 @@
</images>
</configuration>
</plugin>
-
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>3.1.1</version>
+ <executions>
+ <execution>
+ <id>copy-internal-deps</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/libs/internal</outputDirectory>
+ <includeGroupIds>${project.parent.groupId}</includeGroupIds>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-external-deps</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/libs/external</outputDirectory>
+ <excludeGroupIds>${project.parent.groupId}</excludeGroupIds>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</pluginManagement>
</build>
@@ -533,6 +558,11 @@
<version>${protobuf.version}</version>
</dependency>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>