diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-08 16:29:31 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 07:06:19 +0200 |
commit | 7c3b59560f015b65882a56db585b7d4bdd10d434 (patch) | |
tree | 4c15d3657e373d3a681fdd2ab865623aeecc82e7 /hv-collector-client-simulator/src | |
parent | 07bbbf71cd65b29f446a1b475add87f20365db83 (diff) |
Implement Kafka Sink
Closes ONAP-146
Change-Id: I119a8abe70a9042f65a43909e5aa2fbed439e26f
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-client-simulator/src')
2 files changed, 6 insertions, 5 deletions
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt index d5f7c7c8..87a238a8 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt @@ -20,8 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import com.google.protobuf.ByteString -import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.ves.VesEventV5 import reactor.core.publisher.Flux @@ -65,12 +63,12 @@ class MessageFactory { return WireFrame(payload) } - private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf { + private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteArray { val msg = VesEventV5.VesEvent.newBuilder() .setCommonEventHeader(commonHeader) .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data")) .build() - return Unpooled.wrappedBuffer(msg.toByteArray()) + return msg.toByteArray() } } diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt index 29573e86..cb56db91 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt @@ -25,6 +25,7 @@ import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientSecurityConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -64,8 +65,10 @@ class VesHvClient(configuration: ClientConfiguration) { .asString(Charsets.UTF_8) .subscribe { str -> logger.info("Server response: $str") } + val encoder = WireFrameEncoder(nettyOutbound.alloc()) + val frames = messages - .map { it.encode(nettyOutbound.alloc()) } + .map(encoder::encode) .concatWith(Mono.just(Unpooled.EMPTY_BUFFER)) return nettyOutbound |