From 7c3b59560f015b65882a56db585b7d4bdd10d434 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 8 Jun 2018 16:29:31 +0200 Subject: Implement Kafka Sink Closes ONAP-146 Change-Id: I119a8abe70a9042f65a43909e5aa2fbed439e26f Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- hv-collector-client-simulator/Dockerfile | 2 +- .../dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt | 6 ++---- .../onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt | 5 ++++- 3 files changed, 7 insertions(+), 6 deletions(-) (limited to 'hv-collector-client-simulator') diff --git a/hv-collector-client-simulator/Dockerfile b/hv-collector-client-simulator/Dockerfile index 19c4c878..58cfa448 100644 --- a/hv-collector-client-simulator/Dockerfile +++ b/hv-collector-client-simulator/Dockerfile @@ -7,7 +7,7 @@ LABEL maintainer="Nokia Wroclaw ONAP Team" WORKDIR /opt/ves-hv-client-simulator ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"] -CMD ["--ves-host", "hv-collector", "--ves-port", "6061"] +CMD ["--ves-host", "veshvcollector", "--ves-port", "6061"] COPY target/libs/external/* ./ COPY target/libs/internal/* ./ COPY target/hv-collector-client-simulator-*.jar ./ diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt index d5f7c7c8..87a238a8 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt @@ -20,8 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import com.google.protobuf.ByteString -import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.ves.VesEventV5 import reactor.core.publisher.Flux @@ -65,12 +63,12 @@ class MessageFactory { return WireFrame(payload) } - private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf { + private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteArray { val msg = VesEventV5.VesEvent.newBuilder() .setCommonEventHeader(commonHeader) .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data")) .build() - return Unpooled.wrappedBuffer(msg.toByteArray()) + return msg.toByteArray() } } diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt index 29573e86..cb56db91 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt @@ -25,6 +25,7 @@ import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientSecurityConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -64,8 +65,10 @@ class VesHvClient(configuration: ClientConfiguration) { .asString(Charsets.UTF_8) .subscribe { str -> logger.info("Server response: $str") } + val encoder = WireFrameEncoder(nettyOutbound.alloc()) + val frames = messages - .map { it.encode(nettyOutbound.alloc()) } + .map(encoder::encode) .concatWith(Mono.just(Unpooled.EMPTY_BUFFER)) return nettyOutbound -- cgit 1.2.3-korg