aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-domain
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-domain')
-rw-r--r--sources/hv-collector-domain/pom.xml131
-rw-r--r--sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt56
-rw-r--r--sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt34
-rw-r--r--sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/SecurityConfiguration.kt50
-rw-r--r--sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt35
-rw-r--r--sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt102
-rw-r--r--sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt48
-rw-r--r--sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt38
-rw-r--r--sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt86
-rw-r--r--sources/hv-collector-domain/src/main/proto/event/VesEvent.proto75
-rw-r--r--sources/hv-collector-domain/src/main/proto/measurements/README.md1
-rw-r--r--sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt282
-rw-r--r--sources/hv-collector-domain/src/test/resources/logback.xml54
13 files changed, 992 insertions, 0 deletions
diff --git a/sources/hv-collector-domain/pom.xml b/sources/hv-collector-domain/pom.xml
new file mode 100644
index 00000000..f60cb604
--- /dev/null
+++ b/sources/hv-collector-domain/pom.xml
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2018 NOKIA
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
+ <artifactId>hv-collector-sources</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>hv-collector-domain</artifactId>
+ <description>VES HighVolume Collector :: Domain</description>
+
+ <properties>
+ <skipAnalysis>false</skipAnalysis>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>com.github.os72</groupId>
+ <artifactId>protoc-jar-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
+ <inputDirectories>
+ <include>${project.basedir}/src/main/proto/event</include>
+ </inputDirectories>
+ <outputTargets>
+ <outputTarget>
+ <type>java</type>
+ <addSources>none</addSources>
+ <outputDirectory>${protobuf-generated-files.directory}</outputDirectory>
+ </outputTarget>
+ </outputTargets>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib-jdk8</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-junit-platform-engine</artifactId>
+ </dependency>
+ </dependencies>
+
+
+</project>
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt
new file mode 100644
index 00000000..a1ebba37
--- /dev/null
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import io.netty.buffer.ByteBuf
+import java.nio.charset.Charset
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class ByteData(private val data: ByteArray) {
+
+ fun size() = data.size
+
+ /**
+ * This will expose mutable state of the data.
+ *
+ * @return wrapped data buffer (NOT a copy)
+ */
+ fun unsafeAsArray() = data
+
+ fun writeTo(byteBuf: ByteBuf) {
+ byteBuf.writeBytes(data)
+ }
+
+ fun asString(charset: Charset = Charset.defaultCharset()) = String(data, charset)
+
+ companion object {
+ val EMPTY = ByteData(byteArrayOf())
+
+ fun readFrom(byteBuf: ByteBuf, length: Int): ByteData {
+ val dataArray = ByteArray(length)
+ byteBuf.readBytes(dataArray)
+ return ByteData(dataArray)
+ }
+ }
+}
+
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt
new file mode 100644
index 00000000..7cbf3530
--- /dev/null
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+enum class PayloadContentType(val hexValue: Int) {
+ GOOGLE_PROTOCOL_BUFFER(0x0001);
+
+ companion object {
+ private val hexValues = PayloadContentType.values().map { it.hexValue }
+
+ fun isValidHexValue(hex: Int) = hexValues.contains(hex)
+ }
+}
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/SecurityConfiguration.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/SecurityConfiguration.kt
new file mode 100644
index 00000000..7f566a6d
--- /dev/null
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/SecurityConfiguration.kt
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import arrow.core.Option
+import java.io.InputStream
+import java.nio.file.Path
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class SecurityConfiguration(
+ val sslDisable: Boolean = false,
+ val keys: Option<SslKeys>)
+
+sealed class SslKeys
+
+data class OpenSslKeys(val privateKey: Path,
+ val cert: Path,
+ val trustedCert: Path) : SslKeys()
+
+data class JdkKeys(val keyStore: StreamProvider,
+ val keyStorePassword: CharArray,
+ val trustStore: StreamProvider,
+ val trustStorePassword: CharArray) : SslKeys() {
+ fun forgetPasswords() {
+ keyStorePassword.fill('x')
+ trustStorePassword.fill('x')
+ }
+}
+
+typealias StreamProvider = () -> InputStream
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt
new file mode 100644
index 00000000..0b18337d
--- /dev/null
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+enum class VesEventDomain(val domainName: String) {
+ FAULT("fault"),
+ HEARTBEAT("heartbeat"),
+ MEASUREMENT("measurement"),
+ MOBILE_FLOW("mobileFlow"),
+ OTHER("other"),
+ PNF_REGISTRATION("pnfRegistration"),
+ SIP_SIGNALING("sipSignaling"),
+ STATE_CHANGE("stateChange"),
+ SYSLOG("syslog"),
+ THRESHOLD_CROSSING_ALERT("thresholdCrossingAlert"),
+ VOICE_QUALITY("voiceQuality"),
+ PERF3GPP("perf3gpp");
+}
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
new file mode 100644
index 00000000..7fabdf14
--- /dev/null
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -0,0 +1,102 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import arrow.core.Either
+import arrow.core.Left
+import arrow.core.Right
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+
+ fun encode(frame: WireFrameMessage): ByteBuf = allocator
+ .buffer(WireFrameMessage.HEADER_SIZE + frame.payload.size())
+ .run {
+ writeByte(WireFrameMessage.MARKER_BYTE.toInt())
+ writeByte(frame.versionMajor.toInt())
+ writeByte(frame.versionMinor.toInt())
+ writeZero(RESERVED_BYTE_COUNT)
+ writeShort(frame.payloadType)
+ writeInt(frame.payloadSize)
+ }
+ .also {
+ frame.payload.writeTo(it)
+ }
+}
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class WireFrameDecoder(private val maxPayloadSizeBytes: Int) {
+
+ fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
+ when {
+ isEmpty(byteBuf) -> Left(EmptyWireFrame)
+ headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
+ else -> parseWireFrame(byteBuf)
+ }
+
+ private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
+
+ private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrameMessage.HEADER_SIZE
+
+ private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
+ byteBuf.markReaderIndex()
+
+ val mark = byteBuf.readUnsignedByte()
+ return when (mark) {
+ WireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
+ else -> {
+ byteBuf.resetReaderIndex()
+ Left(InvalidWireFrameMarker(mark))
+ }
+ }
+ }
+
+ @Suppress("ReturnCount")
+ private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
+ val versionMajor = byteBuf.readUnsignedByte()
+ val versionMinor = byteBuf.readUnsignedByte()
+ byteBuf.skipBytes(RESERVED_BYTE_COUNT)
+ val payloadTypeRaw = byteBuf.readUnsignedShort()
+ val payloadSize = byteBuf.readInt()
+
+ if (payloadSize > maxPayloadSizeBytes) {
+ byteBuf.resetReaderIndex()
+ return Left(PayloadSizeExceeded(maxPayloadSizeBytes))
+ }
+
+ if (byteBuf.readableBytes() < payloadSize) {
+ byteBuf.resetReaderIndex()
+ return Left(MissingWireFramePayloadBytes)
+ }
+
+ val payload = ByteData.readFrom(byteBuf, payloadSize)
+
+ return Right(WireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
+ }
+}
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
new file mode 100644
index 00000000..0d55cebb
--- /dev/null
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+
+sealed class WireFrameDecodingError(val message: String)
+
+
+// Invalid frame errors
+
+sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
+
+class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
+ "Invalid start of frame. Expected 0x%02X, but was 0x%02X"
+ .format(WireFrameMessage.MARKER_BYTE, actualMarker)
+)
+
+class PayloadSizeExceeded(maxPayloadSizeBytes: Int) :
+ InvalidWireFrame("payload size exceeds the limit ($maxPayloadSizeBytes bytes)")
+
+// Missing bytes errors
+
+sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg)
+
+object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
+object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
+object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt
new file mode 100644
index 00000000..1eb6239f
--- /dev/null
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import org.onap.ves.VesEventOuterClass
+
+val headerRequiredFieldDescriptors = listOf(
+ "version",
+ "domain",
+ /* field "sequence" has been removed from validation, since default value "0" is acceptable */
+ "priority",
+ "eventId",
+ "eventName",
+ "lastEpochMicrosec",
+ "startEpochMicrosec",
+ "reportingEntityName",
+ "sourceName",
+ "vesEventListenerVersion")
+ .map { fieldName -> VesEventOuterClass.CommonEventHeader.getDescriptor().findFieldByName(fieldName) }
+
+val vesEventListenerVersionRegex = """7\.[0-9].*""".toRegex()
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
new file mode 100644
index 00000000..de37b140
--- /dev/null
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
@@ -0,0 +1,86 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+/**
+ * Wire frame structure is presented bellow using ASN.1 notation. Please note that official supported specification
+ * should be available on
+ * [RTD documentation](https://onap.readthedocs.io/en/latest/submodules/dcaegen2.git/docs/sections/apis/ves-hv.html).
+ *
+ * ```
+ * -- Wire Transfer Protocol (binary, defined using ASN.1 notation)
+ * -- Encoding: use "direct encoding" to the number of octets indicated in the comment [n], using network byte order.
+ *
+ * WTP DEFINITIONS ::= BEGIN
+ *
+ * -- Used to sent data from the data provider
+ * WtpData ::= SEQUENCE {
+ * magic INTEGER (0..255), -- [1] always 0xAA
+ * versionMajor INTEGER (0..255), -- [1] major interface version, forward incompatible
+ * -- with previous major version, current value: 1
+ * versionMinor INTEGER (0..255), -- [1] minor interface version, forward compatible
+ * -- with previous minor version, current value: 0
+ * reserved OCTET STRING (SIZE (3)), -- [3] reserved for future use (ignored, but use 0)
+ * payloadId INTEGER (0..65535), -- [2] payload type: 0x0000=undefined, 0x0001=ONAP VesEvent (protobuf)
+ * payloadLength INTEGER (0..4294967295). -- [4] payload length in octets
+ * payload OCTET STRING -- [length as per payloadLength]
+ * }
+ *
+ * END
+ * ```
+ *
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class WireFrameMessage(val payload: ByteData,
+ val versionMajor: Short,
+ val versionMinor: Short,
+ val payloadType: Int,
+ val payloadSize: Int
+) {
+ constructor(payload: ByteArray) : this(
+ ByteData(payload),
+ SUPPORTED_VERSION_MAJOR,
+ SUPPORTED_VERSION_MINOR,
+ PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payload.size)
+
+ fun isValid(): Boolean =
+ versionMajor == SUPPORTED_VERSION_MAJOR
+ && PayloadContentType.isValidHexValue(payloadType)
+ && payload.size() == payloadSize
+
+ companion object {
+ const val MARKER_BYTE: Short = 0xAA
+ const val RESERVED_BYTE_COUNT: Int = 3
+
+ const val SUPPORTED_VERSION_MAJOR: Short = 1
+ const val SUPPORTED_VERSION_MINOR: Short = 0
+
+ const val HEADER_SIZE =
+ 1 * java.lang.Byte.BYTES + // marker
+ 2 * java.lang.Byte.BYTES + // single byte fields
+ 1 * java.lang.Short.BYTES + // double byte fields
+ RESERVED_BYTE_COUNT * java.lang.Byte.BYTES + // reserved bytes
+ 1 * java.lang.Integer.BYTES // payload length
+
+ const val DEFAULT_MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+ }
+}
diff --git a/sources/hv-collector-domain/src/main/proto/event/VesEvent.proto b/sources/hv-collector-domain/src/main/proto/event/VesEvent.proto
new file mode 100644
index 00000000..6d4c2307
--- /dev/null
+++ b/sources/hv-collector-domain/src/main/proto/event/VesEvent.proto
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+syntax = "proto3";
+package org.onap.ves;
+
+message VesEvent // top-level message, currently the maximum event size supported by the HV-VES Collector is 1 MiB
+{
+ CommonEventHeader commonEventHeader=1; // required
+
+ bytes eventFields=2; // required, payload
+ // this field contains a domain-specific GPB message
+ // the field being opaque (bytes), the decoding of the payload occurs in a separate step
+ // the name of the GPB message for domain XYZ is XyzFields
+ // e.g. for domain==perf3gpp, the GPB message is Perf3gppFields
+}
+
+// VES CommonEventHeader adapted to GPB (Google Protocol Buffers)
+
+message CommonEventHeader
+{
+ string version = 1; // required, "version of the gpb common event header", current value "1.0"
+ string domain = 2; // required, "the eventing domain associated with the event", allowed values:
+ // fault, heartbeat, measurement, mobileFlow, other, pnfRegistration, sipSignaling,
+ // stateChange, syslog, thresholdCrossingAlert, voiceQuality, perf3gpp
+
+ uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed"
+
+ enum Priority
+ {
+ PRIORITY_NOT_PROVIDED = 0;
+ HIGH = 1;
+ MEDIUM = 2;
+ NORMAL = 3;
+ LOW = 4;
+ }
+ Priority priority = 4; // required, "processing priority"
+
+ string eventId = 5; // required, "event key that is unique to the event source"
+ string eventName = 6; // required, "unique event name"
+ string eventType = 7; // "for example - guest05, platform"
+
+ uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+ uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+
+ string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards"
+ string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards"
+ string nfVendorName = 12; // " Vendor Name providing the nf "
+
+ bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
+ string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry"
+ bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
+ string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry"
+ string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device"
+ string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener", current value "7.2"
+
+ reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
+ reserved 100;
+}
diff --git a/sources/hv-collector-domain/src/main/proto/measurements/README.md b/sources/hv-collector-domain/src/main/proto/measurements/README.md
new file mode 100644
index 00000000..eb69eb4a
--- /dev/null
+++ b/sources/hv-collector-domain/src/main/proto/measurements/README.md
@@ -0,0 +1 @@
+Measurements data (data placed in VesEvent.eventFields) description should be available in [RTD documentation](https://onap.readthedocs.io/en/latest/submodules/dcaegen2.git/docs/sections/apis/ves-hv.html). \ No newline at end of file
diff --git a/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
new file mode 100644
index 00000000..f17a79ba
--- /dev/null
+++ b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -0,0 +1,282 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import arrow.core.Either
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.Unpooled
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.ObjectAssert
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import java.nio.charset.Charset
+import kotlin.test.assertTrue
+import kotlin.test.fail
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+object WireFrameCodecsTest : Spek({
+ val payloadAsString = "coffeebabe"
+ val maxPayloadSizeBytes = 1024
+ val encoder = WireFrameEncoder()
+ val decoder = WireFrameDecoder(maxPayloadSizeBytes)
+
+ fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
+
+ fun encodeSampleFrame() =
+ createSampleFrame().let {
+ encoder.encode(it)
+ }
+
+ describe("Wire Frame invariants") {
+
+ given("input with unsupported major version") {
+ val input = WireFrameMessage(
+ payload = ByteData.EMPTY,
+ versionMajor = 100,
+ versionMinor = 0,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadSize = 0)
+
+ it("should fail validation") {
+ assertThat(input.isValid()).isFalse()
+ }
+ }
+
+ given("input with unsupported minor version") {
+ val input = WireFrameMessage(
+ payload = ByteData.EMPTY,
+ versionMajor = 1,
+ versionMinor = 6,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadSize = 0)
+
+ it("should pass validation") {
+ assertThat(input.isValid()).isTrue()
+ }
+ }
+
+ given("input with unsupported payload type") {
+ val input = WireFrameMessage(
+ payload = ByteData.EMPTY,
+ versionMajor = 1,
+ versionMinor = 0,
+ payloadType = 0x69,
+ payloadSize = 0)
+
+ it("should fail validation") {
+ assertThat(input.isValid()).isFalse()
+ }
+ }
+
+ given("input with too small payload size") {
+ val input = WireFrameMessage(
+ payload = ByteData(byteArrayOf(1, 2, 3)),
+ versionMajor = 1,
+ versionMinor = 0,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadSize = 1)
+
+ it("should fail validation") {
+ assertThat(input.isValid()).isFalse()
+ }
+ }
+
+ given("input with too big payload size") {
+ val input = WireFrameMessage(
+ payload = ByteData(byteArrayOf(1, 2, 3)),
+ versionMajor = 1,
+ versionMinor = 0,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadSize = 8)
+
+ it("should fail validation") {
+ assertThat(input.isValid()).isFalse()
+ }
+ }
+
+ given("valid input") {
+ val payload = byteArrayOf(6, 9, 8, 6)
+ val input = WireFrameMessage(
+ payload = ByteData(payload),
+ versionMajor = 1,
+ versionMinor = 0,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadSize = payload.size)
+
+ it("should pass validation") {
+ assertThat(input.isValid()).isTrue()
+ }
+ }
+
+
+ }
+
+ describe("Wire Frame codec") {
+
+ describe("encode-decode methods' compatibility") {
+ val frame = createSampleFrame()
+ val encoded = encodeSampleFrame()
+ val decoded = decoder.decodeFirst(encoded).getMessageOrFail()
+
+ it("should decode major version") {
+ assertThat(decoded.versionMajor).isEqualTo(frame.versionMajor)
+ }
+
+ it("should decode minor version") {
+ assertThat(decoded.versionMinor).isEqualTo(frame.versionMinor)
+ }
+
+ it("should decode payload type") {
+ assertThat(decoded.payloadType).isEqualTo(frame.payloadType)
+ }
+
+ it("should decode payload size") {
+ assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize)
+ }
+
+ it("should decode payload") {
+ assertThat(decoded.payload.asString())
+ .isEqualTo(payloadAsString)
+ }
+ }
+
+
+ describe("TCP framing") {
+ // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
+
+ it("should return error when buffer is empty") {
+ val buff = Unpooled.buffer()
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) }
+ assertBufferIntact(buff)
+ }
+
+ it("should return error when given any single byte other than marker byte") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xEE)
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ assertBufferIntact(buff)
+ }
+
+ it("should return error when payload message header does not fit") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xAA)
+ .writeBytes("MOMOM".toByteArray())
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ assertBufferIntact(buff)
+ }
+
+ it("should return error when length looks ok but first byte is not 0xAA") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xFF)
+ .writeBytes("some garbage".toByteArray())
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
+ assertBufferIntact(buff)
+ }
+
+ it("should return error when payload doesn't fit") {
+ val buff = Unpooled.buffer()
+ .writeBytes(encodeSampleFrame())
+ buff.writerIndex(buff.writerIndex() - 2)
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) }
+ assertBufferIntact(buff)
+ }
+
+ it("should decode payload message leaving rest unread") {
+ val buff = Unpooled.buffer()
+ .writeBytes(encodeSampleFrame())
+ .writeByte(0xAB)
+ val decoded = decoder.decodeFirst(buff).getMessageOrFail()
+
+ assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+ assertThat(buff.readableBytes()).isEqualTo(1)
+ }
+ }
+
+ describe("payload size limit") {
+
+ it("should decode successfully when payload size is equal 1 MiB") {
+
+ val payload = ByteArray(maxPayloadSizeBytes)
+ val input = WireFrameMessage(
+ payload = ByteData(payload),
+ versionMajor = 1,
+ versionMinor = 0,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadSize = payload.size)
+
+
+ assertTrue(decoder.decodeFirst(encoder.encode(input)).isRight())
+ }
+
+ it("should return error when payload exceeds 1 MiB") {
+
+ val payload = ByteArray(maxPayloadSizeBytes + 1)
+ val input = WireFrameMessage(
+ payload = ByteData(payload),
+ versionMajor = 1,
+ versionMinor = 0,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadSize = payload.size)
+ val buff = encoder.encode(input)
+
+ decoder.decodeFirst(buff)
+ .assertFailedWithError { it.isInstanceOf(PayloadSizeExceeded::class.java) }
+ assertBufferIntact(buff)
+ }
+
+ it("should validate only first message") {
+
+ val payload = ByteArray(maxPayloadSizeBytes)
+ val input = WireFrameMessage(
+ payload = ByteData(payload),
+ versionMajor = 1,
+ versionMinor = 0,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadSize = payload.size)
+
+
+ assertTrue(decoder.decodeFirst(encoder.encode(input).writeByte(0xAA)).isRight())
+ }
+ }
+ }
+})
+
+private fun assertBufferIntact(buff: ByteBuf) {
+ assertThat(buff.refCnt()).describedAs("buffer should not be released").isEqualTo(1)
+ assertThat(buff.readerIndex()).describedAs("buffer reader index should be intact").isEqualTo(0)
+}
+
+private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) -> Unit) {
+ fold({ assertj(assertThat(it)) }, { fail("Error expected") })
+}
+
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getMessageOrFail(): WireFrameMessage =
+ fold({ fail(it.message) }, { it })
+
diff --git a/sources/hv-collector-domain/src/test/resources/logback.xml b/sources/hv-collector-domain/src/test/resources/logback.xml
new file mode 100644
index 00000000..0bf2cb02
--- /dev/null
+++ b/sources/hv-collector-domain/src/test/resources/logback.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2018 NOKIA
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+ -->
+<configuration>
+ <property name="LOG_FILE"
+ value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+ <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_FILE}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration> \ No newline at end of file