summaryrefslogtreecommitdiffstats
path: root/hv-collector-client-simulator/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-08 16:29:31 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 07:06:19 +0200
commit7c3b59560f015b65882a56db585b7d4bdd10d434 (patch)
tree4c15d3657e373d3a681fdd2ab865623aeecc82e7 /hv-collector-client-simulator/src
parent07bbbf71cd65b29f446a1b475add87f20365db83 (diff)
Implement Kafka Sink
Closes ONAP-146 Change-Id: I119a8abe70a9042f65a43909e5aa2fbed439e26f Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-client-simulator/src')
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt6
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt5
2 files changed, 6 insertions, 5 deletions
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
index d5f7c7c8..87a238a8 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
@@ -20,8 +20,6 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import com.google.protobuf.ByteString
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.Unpooled
import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.ves.VesEventV5
import reactor.core.publisher.Flux
@@ -65,12 +63,12 @@ class MessageFactory {
return WireFrame(payload)
}
- private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf {
+ private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteArray {
val msg = VesEventV5.VesEvent.newBuilder()
.setCommonEventHeader(commonHeader)
.setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
.build()
- return Unpooled.wrappedBuffer(msg.toByteArray())
+ return msg.toByteArray()
}
}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
index 29573e86..cb56db91 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
@@ -25,6 +25,7 @@ import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslProvider
import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientSecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -64,8 +65,10 @@ class VesHvClient(configuration: ClientConfiguration) {
.asString(Charsets.UTF_8)
.subscribe { str -> logger.info("Server response: $str") }
+ val encoder = WireFrameEncoder(nettyOutbound.alloc())
+
val frames = messages
- .map { it.encode(nettyOutbound.alloc()) }
+ .map(encoder::encode)
.concatWith(Mono.just(Unpooled.EMPTY_BUFFER))
return nettyOutbound