summaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-07 08:46:51 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-01 12:34:27 +0200
commit162506c71c90bb0b24005f81ac0673820cf57421 (patch)
treeced018d2917b57a5ed0dd68d94c0c19c94a9c5e5 /hv-collector-core
parent85439499beabf26902ebd670e6855bdaa18f470b (diff)
Add SSL/TLS to client simulator
Change-Id: Iedebd222be08931b95e52a84c8c4d9c0df9e1da1 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt17
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt21
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt7
3 files changed, 40 insertions, 5 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 9cade1cc..535fbe12 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -39,7 +40,9 @@ internal class VesHvCollector(
private val sink: Sink) : Collector {
override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
dataStream
+ .doOnNext(this::logIncomingMessage)
.flatMap(this::decodeWire)
+ .doOnNext(this::logDecodedWireMessage)
.flatMap(this::decodeProtobuf)
.filter(this::validate)
.flatMap(this::findRoute)
@@ -47,6 +50,14 @@ internal class VesHvCollector(
.doOnNext(this::releaseMemory)
.then()
+ private fun logIncomingMessage(wire: ByteBuf) {
+ logger.debug { "Got message with total ${wire.readableBytes()} B"}
+ }
+
+ private fun logDecodedWireMessage(payload: ByteBuf) {
+ logger.debug { "Wire payload size: ${payload.readableBytes()} B"}
+ }
+
private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode)
private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode)
@@ -65,8 +76,6 @@ internal class VesHvCollector(
msg.rawMessage.release()
}
-
-
private fun <T, V>omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> {
@@ -78,4 +87,8 @@ internal class VesHvCollector(
Mono.just(result)
}
}
+
+ companion object {
+ val logger = Logger(VesHvCollector::class)
+ }
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 8a34185f..0aacb266 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -28,6 +28,9 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.ipc.netty.http.client.HttpClient
@@ -41,6 +44,7 @@ import java.nio.ByteBuffer
*/
object AdapterFactory {
fun kafkaSink(): SinkProvider = KafkaSinkProvider()
+ fun loggingSink(): SinkProvider = LoggingSinkProvider()
fun staticConfigurationProvider(config: CollectorConfiguration) =
object : ConfigurationProvider {
@@ -58,8 +62,25 @@ object AdapterFactory {
}
}
+
+ private class LoggingSinkProvider : SinkProvider {
+ override fun invoke(config: CollectorConfiguration): Sink {
+ return object : Sink {
+ private val logger = Logger(LoggingSinkProvider::class)
+ override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> =
+ messages
+ .doOnNext { msg ->
+ logger.info { "Message routed to ${msg.topic}" }
+ }
+ .map { it.message }
+
+ }
+ }
+ }
+
fun consulConfigurationProvider(url: String): ConfigurationProvider =
ConsulConfigurationProvider(url, httpAdapter())
+
fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 415aa217..208b1ba0 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -59,14 +59,15 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
logger.debug("Got connection")
- val pipe = collectorProvider().handleConnection(nettyInbound.receive())
- val hello = nettyOutbound
+ val sendHello = nettyOutbound
.options { it.flushOnEach() }
.sendString(Mono.just("ONAP_VES_HV/0.1\n"))
.then()
- return hello.then(pipe)
+ val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive())
+
+ return sendHello.then(handleIncomingMessages)
}
companion object {