diff options
Diffstat (limited to 'hv-collector-core/src/main')
20 files changed, 987 insertions, 0 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt new file mode 100644 index 00000000..d4de1b5b --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.boundary + +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux + +interface Sink { + fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> +} + +@FunctionalInterface +interface SinkProvider { + operator fun invoke(config: CollectorConfiguration): Sink + + companion object { + fun just(sink: Sink): SinkProvider = + object : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink = sink + } + } +} + +interface ConfigurationProvider { + operator fun invoke(): Flux<CollectorConfiguration> + + companion object { + fun from(function: () -> Flux<CollectorConfiguration>): ConfigurationProvider = + object : ConfigurationProvider { + override fun invoke(): Flux<CollectorConfiguration> = function() + } + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt new file mode 100644 index 00000000..809ba32f --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.boundary + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.ServerConfiguration +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +interface Collector { + fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> +} + +typealias CollectorProvider = () -> Collector + +interface Server { + fun start(): Mono<Void> +} + +interface ServerFactory { + fun createServer(serverConfig: ServerConfiguration, collector: CollectorProvider): Server +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt new file mode 100644 index 00000000..f5e525e7 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt new file mode 100644 index 00000000..2cf2b082 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt @@ -0,0 +1,22 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt new file mode 100644 index 00000000..ea78706e --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +data class ServerConfiguration(val port: Int, val configurationUrl: String) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt new file mode 100644 index 00000000..79b48804 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import io.netty.buffer.ByteBuf +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteBuf) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt new file mode 100644 index 00000000..306b7762 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt @@ -0,0 +1,93 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator + +/** + * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). + * + * ``` + * ┌─────┬───────────────────────┬───────────────────────┬───────────────────────┐ + * │octet│ 0 │ 1 │ 2 │ + * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┤ + * │ bit │ 0│ │ │ │ │ │ │ │ 8│ │ │ │ │ │ │ │16│ │ │ │ │ │ │ │ ... + * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┼──┴──┴──┴──┴──┴──┴──┴──┼──┴──┴──┴──┴──┴──┴──┴──┤ + * │field│ 0xFF │ major version │ minor version │ + * └─────┴───────────────────────┴───────────────────────┴───────────────────────┘ + * ┌─────┬───────────────────────┬───────────────────────┬───────────────────────┬───────────────────────┐ + * │octet│ 3 │ 4 │ 5 │ 6 │ + * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┤ + * ... │ bit │24│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ... + * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┤ + * │field│ payload size │ + * └─────┴───────────────────────────────────────────────────────────────────────────────────────────────┘ + * ┌─────┬─────────────────────── + * │octet│ 7 ... + * ├─────┼──┬──┬──┬──┬──┬──┬──┬── + * ... │ bit │56│ │ │ │ │ │ │... + * ├─────┼──┴──┴──┴──┴──┴──┴──┴── + * │field│ protobuf payload + * └─────┴─────────────────────── + * ``` + * + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +data class WireFrame(val payload: ByteBuf, + val mark: Short, + val majorVersion: Short, + val minorVersion: Short, + val payloadSize: Int) { + fun isValid(): Boolean { + return mark == FF_BYTE + && majorVersion == SUPPORTED_MAJOR_VERSION + && payload.readableBytes() == payloadSize + } + + fun encode(allocator: ByteBufAllocator): ByteBuf { + val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes()) + + bb.writeByte(mark.toInt()) + bb.writeByte(majorVersion.toInt()) + bb.writeByte(minorVersion.toInt()) + bb.writeInt(payloadSize) + bb.writeBytes(payload) + + return bb + } + + companion object { + fun decode(byteBuf: ByteBuf): WireFrame { + val mark = byteBuf.readUnsignedByte() + val majorVersion = byteBuf.readUnsignedByte() + val minorVersion = byteBuf.readUnsignedByte() + val payloadSize = byteBuf.readInt() + val payload = byteBuf.slice() + + return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize) + } + + private const val HEADER_SIZE = 3 + java.lang.Integer.BYTES + private const val FF_BYTE: Short = 0xFF + private const val SUPPORTED_MAJOR_VERSION: Short = 1 + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt new file mode 100644 index 00000000..3f826f7c --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt @@ -0,0 +1,82 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + +data class Routing(val routes: List<Route>) { + + fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) } +} + +data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { + + fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain + + operator fun invoke(message: VesMessage): RoutedMessage = + RoutedMessage(targetTopic, partitioning(message.header), message) +} + + +/* +Configuration DSL + */ + +fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder { + val conf = RoutingBuilder() + conf.init() + return conf +} + +class RoutingBuilder { + private val routes: MutableList<RouteBuilder> = mutableListOf() + + fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder { + val rule = RouteBuilder() + rule.init() + routes.add(rule) + return rule + } + + fun build() = Routing(routes.map { it.build() }.toList()) +} + +class RouteBuilder { + + private lateinit var domain: Domain + private lateinit var targetTopic: String + private lateinit var partitioning: (CommonEventHeader) -> Int + + fun fromDomain(domain: Domain) { + this.domain = domain + } + + fun toTopic(targetTopic: String) { + this.targetTopic = targetTopic + } + + fun withFixedPartitioning(num: Int = 1) { + partitioning = { _ -> num } + } + + fun build() = Route(domain, targetTopic, partitioning) + +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt new file mode 100644 index 00000000..cb018cca --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.factory + +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.dcae.collectors.veshv.impl.MessageValidator +import org.onap.dcae.collectors.veshv.impl.Router +import org.onap.dcae.collectors.veshv.impl.VesDecoder +import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.impl.WireDecoder +import reactor.core.publisher.Flux +import java.util.concurrent.atomic.AtomicReference + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvider: SinkProvider) { + fun createVesHvCollectorProvider(): CollectorProvider { + val collector: AtomicReference<Collector> = AtomicReference() + createVesHvCollector().subscribe(collector::set) + return collector::get + } + + private fun createVesHvCollector(): Flux<Collector> = + configuration().map(this::createVesHvCollector) + + private fun createVesHvCollector(config: CollectorConfiguration): Collector { + return VesHvCollector( + WireDecoder(), + VesDecoder(), + MessageValidator(), + Router(config.routing), + sinkProvider(config)) + } + +} + diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt new file mode 100644 index 00000000..5e60fa56 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.factory + +import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.domain.ServerConfiguration +import org.onap.dcae.collectors.veshv.impl.NettyTcpServer + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object ServerFactory { + val createNettyTcpServer: (ServerConfiguration, CollectorProvider) -> Server = ::NettyTcpServer +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt new file mode 100644 index 00000000..55e65cf7 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader + +internal class MessageValidator { + + val requiredFieldDescriptors = listOf( + "version", + "eventName", + "domain", + "eventId", + "sourceName", + "reportingEntityName", + "priority", + "startEpochMicrosec", + "lastEpochMicrosec", + "sequence") + .map { fieldName -> CommonEventHeader.getDescriptor().findFieldByName(fieldName)} + + fun isValid(message: VesMessage): Boolean { + val header = message.header + return allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS + } + + private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = + requiredFieldDescriptors + .all { fieldDescriptor -> header.hasField(fieldDescriptor) } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt new file mode 100644 index 00000000..ca77df2a --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.domain.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.reactivestreams.Publisher +import reactor.core.publisher.Mono +import reactor.ipc.netty.NettyInbound +import reactor.ipc.netty.NettyOutbound +import reactor.ipc.netty.tcp.TcpServer +import java.util.function.BiFunction + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class NettyTcpServer(val serverConfig: ServerConfiguration, + val collectorProvider: CollectorProvider) : Server { + + override fun start(): Mono<Void> { + logger.info { "Listening on port ${serverConfig.port}" } + return Mono.defer { + val nettyContext = TcpServer.create(serverConfig.port) + .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { t, u -> + handleConnection(t, u) + }) + Mono.never<Void>().doFinally { _ -> nettyContext.shutdown() } + } + } + + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { + logger.debug("Got connection") + val pipe = collectorProvider().handleConnection(nettyInbound.receive()) + + val hello = nettyOutbound + .options { it.flushOnEach() } + .sendString(Mono.just("ONAP_VES_HV/0.1\n")) + .then() + + return hello.then(pipe) + } + + companion object { + private val logger = Logger(NettyTcpServer::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt new file mode 100644 index 00000000..8c005d42 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -0,0 +1,9 @@ +package org.onap.dcae.collectors.veshv.impl + +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.Routing +import org.onap.dcae.collectors.veshv.domain.VesMessage + +class Router(private val routing: Routing) { + fun findDestination(message: VesMessage): RoutedMessage? = routing.routeFor(message.header)?.invoke(message) +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt new file mode 100644 index 00000000..6ace06e4 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import com.google.protobuf.InvalidProtocolBufferException +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class VesDecoder { + + fun decode(bb: ByteBuf): VesMessage? = + try { + val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader + VesMessage(decodedHeader, bb) + } catch (ex: InvalidProtocolBufferException) { + logger.warn { "Dropping incoming message. Invalid protocol buffer: ${ex.message}" } + logger.debug("Cause", ex) + null + } + + + companion object { + private val logger = Logger(VesDecoder::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt new file mode 100644 index 00000000..af9d0b0a --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class VesHvCollector( + val wireDecoder: WireDecoder, + val protobufDecoder: VesDecoder, + val validator: MessageValidator, + val router: Router, + val sink: Sink) : Collector { + override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = + dataStream + .flatMap(this::decodeWire) + .flatMap(this::decodeProtobuf) + .filter(this::validate) + .flatMap(this::findRoute) + .compose(sink::send) + .doOnNext(this::releaseMemory) + .then() + + private fun decodeWire(wire: ByteBuf) = releaseWhenNull(wire, wireDecoder::decode) + + private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode) + + private fun validate(msg: VesMessage): Boolean { + val valid = validator.isValid(msg) + if (!valid) { + msg.rawMessage.release() + } + return valid + } + + private fun findRoute(msg: VesMessage): Mono<RoutedMessage> { + val routedMessage = router.findDestination(msg) + return if (routedMessage == null) + Mono.empty() + else + Mono.just(routedMessage) + } + + private fun releaseMemory(msg: VesMessage) { + msg.rawMessage.release() + } + + private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> { + val result = mapper(input) + return if (result == null) { + input.release() + Mono.empty() + } else { + Mono.just(result) + } + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt new file mode 100644 index 00000000..66ced2d7 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class WireDecoder { + fun decode(byteBuf: ByteBuf): ByteBuf? = + try { + WireFrame.decode(byteBuf) + .takeIf { it.isValid() } + .let { it?.payload } + } catch (ex: IndexOutOfBoundsException) { + logger.debug { "Wire protocol frame could not be decoded - input is too small" } + null + } + + companion object { + private val logger = Logger(WireDecoder::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt new file mode 100644 index 00000000..cdfcfd30 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG +import org.apache.kafka.common.serialization.ByteBufferSerializer +import org.apache.kafka.common.serialization.StringSerializer +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.ipc.netty.http.client.HttpClient +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions +import java.nio.ByteBuffer + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object AdapterFactory { + fun kafkaSink(): SinkProvider = KafkaSinkProvider() + + fun staticConfigurationProvider(config: CollectorConfiguration) = + object : ConfigurationProvider { + override fun invoke() = Flux.just(config) + } + + private class KafkaSinkProvider : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink { + val sender = KafkaSender.create( + SenderOptions.create<CommonEventHeader, ByteBuffer>() + .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) + .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) + .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java)) + return KafkaSink(sender) + } + } + + fun consulConfigurationProvider(url: String): ConfigurationProvider = + ConsulConfigurationProvider(url, httpAdapter()) + fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) +} + diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt new file mode 100644 index 00000000..ef6c2f76 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -0,0 +1,63 @@ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.ves.VesEventV5 +import org.slf4j.LoggerFactory +import reactor.core.publisher.Flux +import java.io.StringReader +import java.time.Duration +import java.util.* +import java.util.concurrent.atomic.AtomicReference +import javax.json.Json +import javax.json.JsonObject + + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since May 2018 + */ +internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter) + : ConfigurationProvider { + + + private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java) + private var lastConfigurationHash: AtomicReference<Int> = AtomicReference() + + override fun invoke(): Flux<CollectorConfiguration> = + Flux.interval(Duration.ZERO, REFRESH_INTERVAL) + .flatMap { http.getResponse(url) } + .filter { body -> body.hashCode() != lastConfigurationHash.get() } + .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) } + .map { str -> getConfigurationJson(str) } + .map { json -> createCollectorConfiguration(json) } + + private fun getConfigurationJson(str: String): JsonObject { + val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0) + val decodedValue = String( + Base64.getDecoder().decode(response.getString("Value"))) + logger.info("Obtained new configuration from consul:\n$decodedValue") + return Json.createReader(StringReader(decodedValue)).readObject() + } + + private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { + + val routing = configuration.getJsonObject("routing") + + return CollectorConfiguration( + kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"), + routing = org.onap.dcae.collectors.veshv.domain.routing { + defineRoute { + fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain"))) + toTopic(routing.getString("toTopic")) + withFixedPartitioning() + } + }.build() + ) + } + + companion object { + private const val REFRESH_INTERVAL_MINUTES: Long = 5 + private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt new file mode 100644 index 00000000..c09af015 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -0,0 +1,29 @@ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono +import reactor.core.publisher.toMono +import reactor.ipc.netty.http.client.HttpClient +import reactor.ipc.netty.http.client.HttpClientResponse +import java.nio.charset.Charset + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since May 2018 + */ +open class HttpAdapter(private val httpClient: HttpClient) { + + private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) + + open fun getResponse(url: String): Mono<String> = + httpClient.get(url) + .onErrorResume { e -> unableToGetResource(e, url) } + .flatMap { res -> res.receiveContent().toMono() } + .map { content -> content.content().toString(Charset.defaultCharset()) } + + + private fun unableToGetResource(e: Throwable, url: String): Mono<HttpClientResponse> { + logger.info("Failed to get resource on path: $url\n${e.localizedMessage}") + return Mono.empty() + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt new file mode 100644 index 00000000..8b558a85 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt @@ -0,0 +1,68 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderRecord +import reactor.kafka.sender.SenderResult +import java.nio.ByteBuffer + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, ByteBuffer>) : Sink { + + override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> { + val records = messages.map(this::vesToKafkaRecord) + return sender.send(records) + .doOnNext(::logException) + .filter(::isSuccessful) + .map { it.correlationMetadata() } + } + + private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, ByteBuffer, VesMessage> { + return SenderRecord.create( + msg.topic, + msg.partition, + System.currentTimeMillis(), + msg.message.header, + msg.message.rawMessage.nioBuffer(), + msg.message) + } + + private fun logException(senderResult: SenderResult<out Any>) { + if (senderResult.exception() != null) { + logger.warn(senderResult.exception()) { "Failed to send message to Kafka" } + } + } + + private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null + + companion object { + val logger = Logger(KafkaSink::class) + } +} |