diff options
Diffstat (limited to 'hv-collector-core')
28 files changed, 1657 insertions, 0 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml new file mode 100644 index 00000000..ed501a44 --- /dev/null +++ b/hv-collector-core/pom.xml @@ -0,0 +1,142 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <parent> + <groupId>org.onap.dcaegen2.collectors.veshv</groupId> + <artifactId>ves-hv-collector</artifactId> + <version>1.0.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>hv-collector-core</artifactId> + <description>VES HighVolume Collector :: Core</description> + + <properties> + <skipAnalysis>false</skipAnalysis> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>protobuf</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-utils</artifactId> + <version>${project.parent.version}</version> + </dependency> + + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-reflect</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.addons</groupId> + <artifactId>reactor-extra</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.ipc</groupId> + <artifactId>reactor-netty</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.kafka</groupId> + <artifactId>reactor-kafka</artifactId> + </dependency> + <dependency> + <groupId>javax.json</groupId> + <artifactId>javax.json-api</artifactId> + </dependency> + <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + <scope>runtime</scope> + </dependency> + + + <dependency> + <groupId>com.nhaarman</groupId> + <artifactId>mockito-kotlin</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-test</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt new file mode 100644 index 00000000..d4de1b5b --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.boundary + +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux + +interface Sink { + fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> +} + +@FunctionalInterface +interface SinkProvider { + operator fun invoke(config: CollectorConfiguration): Sink + + companion object { + fun just(sink: Sink): SinkProvider = + object : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink = sink + } + } +} + +interface ConfigurationProvider { + operator fun invoke(): Flux<CollectorConfiguration> + + companion object { + fun from(function: () -> Flux<CollectorConfiguration>): ConfigurationProvider = + object : ConfigurationProvider { + override fun invoke(): Flux<CollectorConfiguration> = function() + } + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt new file mode 100644 index 00000000..809ba32f --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.boundary + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.ServerConfiguration +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +interface Collector { + fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> +} + +typealias CollectorProvider = () -> Collector + +interface Server { + fun start(): Mono<Void> +} + +interface ServerFactory { + fun createServer(serverConfig: ServerConfiguration, collector: CollectorProvider): Server +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt new file mode 100644 index 00000000..f5e525e7 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt new file mode 100644 index 00000000..2cf2b082 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt @@ -0,0 +1,22 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt new file mode 100644 index 00000000..ea78706e --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +data class ServerConfiguration(val port: Int, val configurationUrl: String) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt new file mode 100644 index 00000000..79b48804 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import io.netty.buffer.ByteBuf +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteBuf) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt new file mode 100644 index 00000000..306b7762 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt @@ -0,0 +1,93 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator + +/** + * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). + * + * ``` + * ┌─────┬───────────────────────┬───────────────────────┬───────────────────────┐ + * │octet│ 0 │ 1 │ 2 │ + * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┤ + * │ bit │ 0│ │ │ │ │ │ │ │ 8│ │ │ │ │ │ │ │16│ │ │ │ │ │ │ │ ... + * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┼──┴──┴──┴──┴──┴──┴──┴──┼──┴──┴──┴──┴──┴──┴──┴──┤ + * │field│ 0xFF │ major version │ minor version │ + * └─────┴───────────────────────┴───────────────────────┴───────────────────────┘ + * ┌─────┬───────────────────────┬───────────────────────┬───────────────────────┬───────────────────────┐ + * │octet│ 3 │ 4 │ 5 │ 6 │ + * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┤ + * ... │ bit │24│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ... + * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┤ + * │field│ payload size │ + * └─────┴───────────────────────────────────────────────────────────────────────────────────────────────┘ + * ┌─────┬─────────────────────── + * │octet│ 7 ... + * ├─────┼──┬──┬──┬──┬──┬──┬──┬── + * ... │ bit │56│ │ │ │ │ │ │... + * ├─────┼──┴──┴──┴──┴──┴──┴──┴── + * │field│ protobuf payload + * └─────┴─────────────────────── + * ``` + * + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +data class WireFrame(val payload: ByteBuf, + val mark: Short, + val majorVersion: Short, + val minorVersion: Short, + val payloadSize: Int) { + fun isValid(): Boolean { + return mark == FF_BYTE + && majorVersion == SUPPORTED_MAJOR_VERSION + && payload.readableBytes() == payloadSize + } + + fun encode(allocator: ByteBufAllocator): ByteBuf { + val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes()) + + bb.writeByte(mark.toInt()) + bb.writeByte(majorVersion.toInt()) + bb.writeByte(minorVersion.toInt()) + bb.writeInt(payloadSize) + bb.writeBytes(payload) + + return bb + } + + companion object { + fun decode(byteBuf: ByteBuf): WireFrame { + val mark = byteBuf.readUnsignedByte() + val majorVersion = byteBuf.readUnsignedByte() + val minorVersion = byteBuf.readUnsignedByte() + val payloadSize = byteBuf.readInt() + val payload = byteBuf.slice() + + return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize) + } + + private const val HEADER_SIZE = 3 + java.lang.Integer.BYTES + private const val FF_BYTE: Short = 0xFF + private const val SUPPORTED_MAJOR_VERSION: Short = 1 + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt new file mode 100644 index 00000000..3f826f7c --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt @@ -0,0 +1,82 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + +data class Routing(val routes: List<Route>) { + + fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) } +} + +data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { + + fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain + + operator fun invoke(message: VesMessage): RoutedMessage = + RoutedMessage(targetTopic, partitioning(message.header), message) +} + + +/* +Configuration DSL + */ + +fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder { + val conf = RoutingBuilder() + conf.init() + return conf +} + +class RoutingBuilder { + private val routes: MutableList<RouteBuilder> = mutableListOf() + + fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder { + val rule = RouteBuilder() + rule.init() + routes.add(rule) + return rule + } + + fun build() = Routing(routes.map { it.build() }.toList()) +} + +class RouteBuilder { + + private lateinit var domain: Domain + private lateinit var targetTopic: String + private lateinit var partitioning: (CommonEventHeader) -> Int + + fun fromDomain(domain: Domain) { + this.domain = domain + } + + fun toTopic(targetTopic: String) { + this.targetTopic = targetTopic + } + + fun withFixedPartitioning(num: Int = 1) { + partitioning = { _ -> num } + } + + fun build() = Route(domain, targetTopic, partitioning) + +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt new file mode 100644 index 00000000..cb018cca --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.factory + +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.dcae.collectors.veshv.impl.MessageValidator +import org.onap.dcae.collectors.veshv.impl.Router +import org.onap.dcae.collectors.veshv.impl.VesDecoder +import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.impl.WireDecoder +import reactor.core.publisher.Flux +import java.util.concurrent.atomic.AtomicReference + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvider: SinkProvider) { + fun createVesHvCollectorProvider(): CollectorProvider { + val collector: AtomicReference<Collector> = AtomicReference() + createVesHvCollector().subscribe(collector::set) + return collector::get + } + + private fun createVesHvCollector(): Flux<Collector> = + configuration().map(this::createVesHvCollector) + + private fun createVesHvCollector(config: CollectorConfiguration): Collector { + return VesHvCollector( + WireDecoder(), + VesDecoder(), + MessageValidator(), + Router(config.routing), + sinkProvider(config)) + } + +} + diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt new file mode 100644 index 00000000..5e60fa56 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.factory + +import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.domain.ServerConfiguration +import org.onap.dcae.collectors.veshv.impl.NettyTcpServer + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object ServerFactory { + val createNettyTcpServer: (ServerConfiguration, CollectorProvider) -> Server = ::NettyTcpServer +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt new file mode 100644 index 00000000..55e65cf7 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader + +internal class MessageValidator { + + val requiredFieldDescriptors = listOf( + "version", + "eventName", + "domain", + "eventId", + "sourceName", + "reportingEntityName", + "priority", + "startEpochMicrosec", + "lastEpochMicrosec", + "sequence") + .map { fieldName -> CommonEventHeader.getDescriptor().findFieldByName(fieldName)} + + fun isValid(message: VesMessage): Boolean { + val header = message.header + return allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS + } + + private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = + requiredFieldDescriptors + .all { fieldDescriptor -> header.hasField(fieldDescriptor) } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt new file mode 100644 index 00000000..ca77df2a --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.domain.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.reactivestreams.Publisher +import reactor.core.publisher.Mono +import reactor.ipc.netty.NettyInbound +import reactor.ipc.netty.NettyOutbound +import reactor.ipc.netty.tcp.TcpServer +import java.util.function.BiFunction + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class NettyTcpServer(val serverConfig: ServerConfiguration, + val collectorProvider: CollectorProvider) : Server { + + override fun start(): Mono<Void> { + logger.info { "Listening on port ${serverConfig.port}" } + return Mono.defer { + val nettyContext = TcpServer.create(serverConfig.port) + .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { t, u -> + handleConnection(t, u) + }) + Mono.never<Void>().doFinally { _ -> nettyContext.shutdown() } + } + } + + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { + logger.debug("Got connection") + val pipe = collectorProvider().handleConnection(nettyInbound.receive()) + + val hello = nettyOutbound + .options { it.flushOnEach() } + .sendString(Mono.just("ONAP_VES_HV/0.1\n")) + .then() + + return hello.then(pipe) + } + + companion object { + private val logger = Logger(NettyTcpServer::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt new file mode 100644 index 00000000..8c005d42 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -0,0 +1,9 @@ +package org.onap.dcae.collectors.veshv.impl + +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.Routing +import org.onap.dcae.collectors.veshv.domain.VesMessage + +class Router(private val routing: Routing) { + fun findDestination(message: VesMessage): RoutedMessage? = routing.routeFor(message.header)?.invoke(message) +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt new file mode 100644 index 00000000..6ace06e4 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import com.google.protobuf.InvalidProtocolBufferException +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class VesDecoder { + + fun decode(bb: ByteBuf): VesMessage? = + try { + val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader + VesMessage(decodedHeader, bb) + } catch (ex: InvalidProtocolBufferException) { + logger.warn { "Dropping incoming message. Invalid protocol buffer: ${ex.message}" } + logger.debug("Cause", ex) + null + } + + + companion object { + private val logger = Logger(VesDecoder::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt new file mode 100644 index 00000000..af9d0b0a --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class VesHvCollector( + val wireDecoder: WireDecoder, + val protobufDecoder: VesDecoder, + val validator: MessageValidator, + val router: Router, + val sink: Sink) : Collector { + override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = + dataStream + .flatMap(this::decodeWire) + .flatMap(this::decodeProtobuf) + .filter(this::validate) + .flatMap(this::findRoute) + .compose(sink::send) + .doOnNext(this::releaseMemory) + .then() + + private fun decodeWire(wire: ByteBuf) = releaseWhenNull(wire, wireDecoder::decode) + + private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode) + + private fun validate(msg: VesMessage): Boolean { + val valid = validator.isValid(msg) + if (!valid) { + msg.rawMessage.release() + } + return valid + } + + private fun findRoute(msg: VesMessage): Mono<RoutedMessage> { + val routedMessage = router.findDestination(msg) + return if (routedMessage == null) + Mono.empty() + else + Mono.just(routedMessage) + } + + private fun releaseMemory(msg: VesMessage) { + msg.rawMessage.release() + } + + private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> { + val result = mapper(input) + return if (result == null) { + input.release() + Mono.empty() + } else { + Mono.just(result) + } + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt new file mode 100644 index 00000000..66ced2d7 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class WireDecoder { + fun decode(byteBuf: ByteBuf): ByteBuf? = + try { + WireFrame.decode(byteBuf) + .takeIf { it.isValid() } + .let { it?.payload } + } catch (ex: IndexOutOfBoundsException) { + logger.debug { "Wire protocol frame could not be decoded - input is too small" } + null + } + + companion object { + private val logger = Logger(WireDecoder::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt new file mode 100644 index 00000000..cdfcfd30 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG +import org.apache.kafka.common.serialization.ByteBufferSerializer +import org.apache.kafka.common.serialization.StringSerializer +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.ipc.netty.http.client.HttpClient +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions +import java.nio.ByteBuffer + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object AdapterFactory { + fun kafkaSink(): SinkProvider = KafkaSinkProvider() + + fun staticConfigurationProvider(config: CollectorConfiguration) = + object : ConfigurationProvider { + override fun invoke() = Flux.just(config) + } + + private class KafkaSinkProvider : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink { + val sender = KafkaSender.create( + SenderOptions.create<CommonEventHeader, ByteBuffer>() + .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) + .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) + .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java)) + return KafkaSink(sender) + } + } + + fun consulConfigurationProvider(url: String): ConfigurationProvider = + ConsulConfigurationProvider(url, httpAdapter()) + fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) +} + diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt new file mode 100644 index 00000000..ef6c2f76 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -0,0 +1,63 @@ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.ves.VesEventV5 +import org.slf4j.LoggerFactory +import reactor.core.publisher.Flux +import java.io.StringReader +import java.time.Duration +import java.util.* +import java.util.concurrent.atomic.AtomicReference +import javax.json.Json +import javax.json.JsonObject + + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since May 2018 + */ +internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter) + : ConfigurationProvider { + + + private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java) + private var lastConfigurationHash: AtomicReference<Int> = AtomicReference() + + override fun invoke(): Flux<CollectorConfiguration> = + Flux.interval(Duration.ZERO, REFRESH_INTERVAL) + .flatMap { http.getResponse(url) } + .filter { body -> body.hashCode() != lastConfigurationHash.get() } + .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) } + .map { str -> getConfigurationJson(str) } + .map { json -> createCollectorConfiguration(json) } + + private fun getConfigurationJson(str: String): JsonObject { + val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0) + val decodedValue = String( + Base64.getDecoder().decode(response.getString("Value"))) + logger.info("Obtained new configuration from consul:\n$decodedValue") + return Json.createReader(StringReader(decodedValue)).readObject() + } + + private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { + + val routing = configuration.getJsonObject("routing") + + return CollectorConfiguration( + kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"), + routing = org.onap.dcae.collectors.veshv.domain.routing { + defineRoute { + fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain"))) + toTopic(routing.getString("toTopic")) + withFixedPartitioning() + } + }.build() + ) + } + + companion object { + private const val REFRESH_INTERVAL_MINUTES: Long = 5 + private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt new file mode 100644 index 00000000..c09af015 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -0,0 +1,29 @@ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono +import reactor.core.publisher.toMono +import reactor.ipc.netty.http.client.HttpClient +import reactor.ipc.netty.http.client.HttpClientResponse +import java.nio.charset.Charset + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since May 2018 + */ +open class HttpAdapter(private val httpClient: HttpClient) { + + private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) + + open fun getResponse(url: String): Mono<String> = + httpClient.get(url) + .onErrorResume { e -> unableToGetResource(e, url) } + .flatMap { res -> res.receiveContent().toMono() } + .map { content -> content.content().toString(Charset.defaultCharset()) } + + + private fun unableToGetResource(e: Throwable, url: String): Mono<HttpClientResponse> { + logger.info("Failed to get resource on path: $url\n${e.localizedMessage}") + return Mono.empty() + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt new file mode 100644 index 00000000..8b558a85 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt @@ -0,0 +1,68 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderRecord +import reactor.kafka.sender.SenderResult +import java.nio.ByteBuffer + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, ByteBuffer>) : Sink { + + override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> { + val records = messages.map(this::vesToKafkaRecord) + return sender.send(records) + .doOnNext(::logException) + .filter(::isSuccessful) + .map { it.correlationMetadata() } + } + + private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, ByteBuffer, VesMessage> { + return SenderRecord.create( + msg.topic, + msg.partition, + System.currentTimeMillis(), + msg.message.header, + msg.message.rawMessage.nioBuffer(), + msg.message) + } + + private fun logException(senderResult: SenderResult<out Any>) { + if (senderResult.exception() != null) { + logger.warn(senderResult.exception()) { "Failed to send message to Kafka" } + } + } + + private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null + + companion object { + val logger = Logger(KafkaSink::class) + } +} diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt new file mode 100644 index 00000000..8f6b76fb --- /dev/null +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt @@ -0,0 +1,108 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import com.google.protobuf.ByteString +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import io.netty.buffer.Unpooled.wrappedBuffer +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent +import org.assertj.core.api.Assertions.assertThat +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.* + +internal object MessageValidatorTest : Spek({ + + fun vesMessageBytes(commonHeader: CommonEventHeader): ByteBuf { + val msg = VesEvent.newBuilder() + .setCommonEventHeader(commonHeader) + .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data")) + .build() + return wrappedBuffer(msg.toByteArray()) + } + + given("Message validator") { + val cut = MessageValidator() + + on("ves hv message including header with fully initialized fields") { + val commonHeader = newBuilder() + .setVersion("1.9") + .setEventName("Sample event name") + .setDomain(Domain.HVRANMEAS) + .setEventId("Sample event Id") + .setSourceName("Sample Source") + .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String")) + .setPriority(Priority.MEDIUM) + .setStartEpochMicrosec(120034455) + .setLastEpochMicrosec(120034459) + .setSequence(2) + .build() + + it("should accept message with fully initialized message header") { + val vesMessage = VesMessage(commonHeader, vesMessageBytes(commonHeader)) + assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue() + } + + it("should reject message with domain other than HVRANMEAS") { + Domain.values() + .filter { it != Domain.HVRANMEAS && it != Domain.UNRECOGNIZED } + .forEach { domain -> + val header = newBuilder(commonHeader).setDomain(domain).build() + val vesMessage = VesMessage(header, vesMessageBytes(header)) + assertThat(cut.isValid(vesMessage)) + .describedAs("message with $domain domain") + .isFalse() + } + } + } + + on("ves hv message bytes") { + val vesMessage = VesMessage(getDefaultInstance(), Unpooled.EMPTY_BUFFER) + it("should not accept message with default header") { + assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + } + } + + + on("ves hv message including header with not initialized fields") { + val commonHeader = newBuilder() + .setVersion("1.9") + .setEventName("Sample event name") + .setEventId("Sample event Id") + .setSourceName("Sample Source") + .build() + val msg = VesEvent.newBuilder() + .setCommonEventHeader(commonHeader) + .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data !!!")) + .build() + val rawMessageBytes = wrappedBuffer(msg.toByteArray()) + + it("should not accept not fully initialized message header ") { + val vesMessage = VesMessage(commonHeader, rawMessageBytes) + assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + } + } + } +})
\ No newline at end of file diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt new file mode 100644 index 00000000..ac91aaeb --- /dev/null +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -0,0 +1,92 @@ +package org.onap.dcae.collectors.veshv.impl + +import io.netty.buffer.Unpooled +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.domain.routing +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object RouterTest : Spek({ + given("sample configuration") { + val config = routing { + + defineRoute { + fromDomain(Domain.HVRANMEAS) + toTopic("ves_rtpm") + withFixedPartitioning(2) + } + + defineRoute { + fromDomain(Domain.SYSLOG) + toTopic("ves_trace") + withFixedPartitioning() + } + }.build() + val cut = Router(config) + + on("message with existing route (rtpm)") { + val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), Unpooled.EMPTY_BUFFER) + val result = cut.findDestination(message) + + it("should have route available") { + assertThat(result).isNotNull() + } + + it("should be routed to proper partition") { + assertThat(result?.partition).isEqualTo(2) + } + + it("should be routed to proper topic") { + assertThat(result?.topic).isEqualTo("ves_rtpm") + } + + it("should be routed with a given message") { + assertThat(result?.message).isSameAs(message) + } + } + + on("message with existing route (trace)") { + val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), Unpooled.EMPTY_BUFFER) + val result = cut.findDestination(message) + + it("should have route available") { + assertThat(result).isNotNull() + } + + it("should be routed to proper partition") { + assertThat(result?.partition).isEqualTo(1) + } + + it("should be routed to proper topic") { + assertThat(result?.topic).isEqualTo("ves_trace") + } + + it("should be routed with a given message") { + assertThat(result?.message).isSameAs(message) + } + } + + on("message with unknown route") { + val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), Unpooled.EMPTY_BUFFER) + val result = cut.findDestination(message) + + it("should not have route available") { + assertThat(result).isNull() + } + } + } +}) + +private fun vesCommonHeaderWithDomain(domain: Domain) = + CommonEventHeader.getDefaultInstance().toBuilder() + .setDomain(domain) + .build()
\ No newline at end of file diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt new file mode 100644 index 00000000..4d9c9239 --- /dev/null +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -0,0 +1,69 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import com.google.protobuf.ByteString +import io.netty.buffer.Unpooled.wrappedBuffer +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.test.StepVerifier +import java.nio.charset.Charset + + +internal object VesDecoderTest : Spek({ + + given("ves message decoder") { + val cut = VesDecoder() + + on("ves hv message bytes") { + val commonHeader = CommonEventHeader.getDefaultInstance() + val msg = VesEvent.newBuilder() + .setCommonEventHeader(commonHeader) + .setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements")) + .build() + val rawMessageBytes = wrappedBuffer(msg.toByteArray()) + + + it("should decode only header and pass it on along with raw message") { + val expectedMessage = VesMessage( + commonHeader, + rawMessageBytes + ) + + assertThat(cut.decode(rawMessageBytes)).isEqualTo(expectedMessage) + + } + } + + on("invalid ves hv message bytes") { + val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset())) + + it("should return empty result") { + assertThat(cut.decode(rawMessageBytes)).isNull() + } + } + } +}) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt new file mode 100644 index 00000000..3563bf6d --- /dev/null +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt @@ -0,0 +1,104 @@ +package org.onap.dcae.collectors.veshv.impl + +import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.WireFrame + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> + * @since May 2018 + */ +internal object WireDecoderTest : Spek({ + describe("decoding wire protocol") { + val cut = WireDecoder() + + fun decode(frame: WireFrame) = + cut.decode( + frame.encode(UnpooledByteBufAllocator.DEFAULT)) + + given("empty input") { + val input = Unpooled.EMPTY_BUFFER + + it("should yield empty result") { + assertThat(cut.decode(input)).isNull() + } + } + + given("input without 0xFF first byte") { + val input = WireFrame( + payload = Unpooled.EMPTY_BUFFER, + mark = 0x10, + majorVersion = 1, + minorVersion = 2, + payloadSize = 0) + + it("should yield empty result") { + assertThat(decode(input)).isNull() + } + } + + given("input with unsupported major version") { + val input = WireFrame( + payload = Unpooled.EMPTY_BUFFER, + mark = 0xFF, + majorVersion = 100, + minorVersion = 2, + payloadSize = 0) + + it("should yield empty result") { + assertThat(decode(input)).isNull() + } + } + + given("input with too small payload size") { + val input = WireFrame( + payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2 ,3)), + mark = 0xFF, + majorVersion = 1, + minorVersion = 0, + payloadSize = 1) + + it("should yield empty result") { + assertThat(decode(input)).isNull() + } + } + + given("input with too big payload size") { + val input = WireFrame( + payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2 ,3)), + mark = 0xFF, + majorVersion = 1, + minorVersion = 0, + payloadSize = 8) + + it("should yield empty result") { + assertThat(decode(input)).isNull() + } + } + + given("valid input") { + val payload = byteArrayOf(6, 9, 8, 6) + val input = WireFrame( + payload = Unpooled.wrappedBuffer(payload), + mark = 0xFF, + majorVersion = 1, + minorVersion = 0, + payloadSize = payload.size) + + + it("should yield Google Protocol Buffers payload") { + val result = decode(input)!! + + val actualPayload = ByteArray(result.readableBytes()) + result.readBytes(actualPayload) + + assertThat(actualPayload).containsExactly(*payload) + } + } + } +}) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt new file mode 100644 index 00000000..bf65c859 --- /dev/null +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -0,0 +1,60 @@ +package org.onap.dcae.collectors.veshv.impl.adapters + +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.whenever +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import reactor.core.publisher.Mono +import java.util.* +import kotlin.test.assertEquals + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since May 2018 + */ +internal object ConsulConfigurationProviderTest : Spek({ + + given("valid resource url") { + val testUrl = "http://valid-url/" + val httpAdapterMock: HttpAdapter = mock() + val consulConfigProvider = ConsulConfigurationProvider(testUrl, httpAdapterMock) + + whenever(httpAdapterMock.getResponse(testUrl)).thenReturn(Mono.just(constructConsulResponse())) + + + it("should create valid collector configuration") { + val response = consulConfigProvider().blockFirst() + assertEquals("val1", response.kafkaBootstrapServers) + val route = response.routing.routes[0] + assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route.domain) + assertEquals("val3", route.targetTopic) + } + } +}) + +fun constructConsulResponse(): String { + + val config = """{ + "kafkaBootstrapServers": "val1", + "routing": { + "fromDomain": 2, + "toTopic": "val3" + } + }""" + + val encodedValue = String(Base64.getEncoder().encode(config.toByteArray())) + + return """[ + { + "CreateIndex": 100, + "ModifyIndex": 200, + "LockIndex": 200, + "Key": "zip", + "Flags": 0, + "Value": "$encodedValue", + "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e" + } + ]""" +} diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt new file mode 100644 index 00000000..10e8b6bf --- /dev/null +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt @@ -0,0 +1,60 @@ +package org.onap.dcae.collectors.veshv.impl.adapters + +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.whenever +import io.netty.buffer.Unpooled +import io.netty.handler.codec.http.HttpContent +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.ipc.netty.http.client.HttpClient +import reactor.ipc.netty.http.client.HttpClientResponse +import java.nio.charset.Charset +import kotlin.test.assertEquals + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since May 2018 + */ +internal object HttpAdapterTest : Spek({ + + given("valid resource url") { + + val httpClientMock: HttpClient = mock() + val httpAdapter = HttpAdapter(httpClientMock) + val testUrl = "http://valid-url/" + val responseContent = """{"key1": "value1", "key2": "value2"}""" + val httpResponse = createHttpResponseMock(responseContent) + whenever(httpClientMock.get(testUrl)).thenReturn(Mono.just(httpResponse)) + + it("should return response string") { + assertEquals(responseContent, httpAdapter.getResponse(testUrl).block()) + } + } + + given("invalid resource url") { + + val httpClientMock: HttpClient = mock() + val httpAdapter = HttpAdapter(httpClientMock) + val testUrl = "http://invalid-url/" + whenever(httpClientMock.get(testUrl)).thenReturn(Mono.error(Exception("Test exception"))) + + + it("should return null response") { + assertEquals(null, httpAdapter.getResponse(testUrl).block()) + } + } +}) + +fun createHttpResponseMock(content: String): HttpClientResponse { + val responseMock: HttpClientResponse = mock() + val contentMock: HttpContent = mock() + val contentByteBuff = Unpooled.copiedBuffer(content, Charset.defaultCharset()) + + whenever(responseMock.receiveContent()).thenReturn(Flux.just(contentMock)) + whenever(contentMock.content()).thenReturn(contentByteBuff) + + return responseMock +} diff --git a/hv-collector-core/src/test/resources/logback.xml b/hv-collector-core/src/test/resources/logback.xml new file mode 100644 index 00000000..809f62d4 --- /dev/null +++ b/hv-collector-core/src/test/resources/logback.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> + <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern> + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file |