diff options
Diffstat (limited to 'sources/hv-collector-domain')
13 files changed, 992 insertions, 0 deletions
diff --git a/sources/hv-collector-domain/pom.xml b/sources/hv-collector-domain/pom.xml new file mode 100644 index 00000000..f60cb604 --- /dev/null +++ b/sources/hv-collector-domain/pom.xml @@ -0,0 +1,131 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <parent> + <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> + <artifactId>hv-collector-sources</artifactId> + <version>1.1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>hv-collector-domain</artifactId> + <description>VES HighVolume Collector :: Domain</description> + + <properties> + <skipAnalysis>false</skipAnalysis> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>com.github.os72</groupId> + <artifactId>protoc-jar-maven-plugin</artifactId> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact> + <inputDirectories> + <include>${project.basedir}/src/main/proto/event</include> + </inputDirectories> + <outputTargets> + <outputTarget> + <type>java</type> + <addSources>none</addSources> + <outputDirectory>${protobuf-generated-files.directory}</outputDirectory> + </outputTarget> + </outputTargets> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-jdk8</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.netty</groupId> + <artifactId>reactor-netty</artifactId> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-core</artifactId> + </dependency> + + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + </dependency> + </dependencies> + + +</project> diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt new file mode 100644 index 00000000..a1ebba37 --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import io.netty.buffer.ByteBuf +import java.nio.charset.Charset + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class ByteData(private val data: ByteArray) { + + fun size() = data.size + + /** + * This will expose mutable state of the data. + * + * @return wrapped data buffer (NOT a copy) + */ + fun unsafeAsArray() = data + + fun writeTo(byteBuf: ByteBuf) { + byteBuf.writeBytes(data) + } + + fun asString(charset: Charset = Charset.defaultCharset()) = String(data, charset) + + companion object { + val EMPTY = ByteData(byteArrayOf()) + + fun readFrom(byteBuf: ByteBuf, length: Int): ByteData { + val dataArray = ByteArray(length) + byteBuf.readBytes(dataArray) + return ByteData(dataArray) + } + } +} + diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt new file mode 100644 index 00000000..7cbf3530 --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +enum class PayloadContentType(val hexValue: Int) { + GOOGLE_PROTOCOL_BUFFER(0x0001); + + companion object { + private val hexValues = PayloadContentType.values().map { it.hexValue } + + fun isValidHexValue(hex: Int) = hexValues.contains(hex) + } +} diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/SecurityConfiguration.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/SecurityConfiguration.kt new file mode 100644 index 00000000..7f566a6d --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/SecurityConfiguration.kt @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import arrow.core.Option +import java.io.InputStream +import java.nio.file.Path + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +data class SecurityConfiguration( + val sslDisable: Boolean = false, + val keys: Option<SslKeys>) + +sealed class SslKeys + +data class OpenSslKeys(val privateKey: Path, + val cert: Path, + val trustedCert: Path) : SslKeys() + +data class JdkKeys(val keyStore: StreamProvider, + val keyStorePassword: CharArray, + val trustStore: StreamProvider, + val trustStorePassword: CharArray) : SslKeys() { + fun forgetPasswords() { + keyStorePassword.fill('x') + trustStorePassword.fill('x') + } +} + +typealias StreamProvider = () -> InputStream diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt new file mode 100644 index 00000000..0b18337d --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +enum class VesEventDomain(val domainName: String) { + FAULT("fault"), + HEARTBEAT("heartbeat"), + MEASUREMENT("measurement"), + MOBILE_FLOW("mobileFlow"), + OTHER("other"), + PNF_REGISTRATION("pnfRegistration"), + SIP_SIGNALING("sipSignaling"), + STATE_CHANGE("stateChange"), + SYSLOG("syslog"), + THRESHOLD_CROSSING_ALERT("thresholdCrossingAlert"), + VOICE_QUALITY("voiceQuality"), + PERF3GPP("perf3gpp"); +} diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt new file mode 100644 index 00000000..7fabdf14 --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -0,0 +1,102 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import arrow.core.Either +import arrow.core.Left +import arrow.core.Right +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) { + + fun encode(frame: WireFrameMessage): ByteBuf = allocator + .buffer(WireFrameMessage.HEADER_SIZE + frame.payload.size()) + .run { + writeByte(WireFrameMessage.MARKER_BYTE.toInt()) + writeByte(frame.versionMajor.toInt()) + writeByte(frame.versionMinor.toInt()) + writeZero(RESERVED_BYTE_COUNT) + writeShort(frame.payloadType) + writeInt(frame.payloadSize) + } + .also { + frame.payload.writeTo(it) + } +} + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class WireFrameDecoder(private val maxPayloadSizeBytes: Int) { + + fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> = + when { + isEmpty(byteBuf) -> Left(EmptyWireFrame) + headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes) + else -> parseWireFrame(byteBuf) + } + + private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1 + + private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrameMessage.HEADER_SIZE + + private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> { + byteBuf.markReaderIndex() + + val mark = byteBuf.readUnsignedByte() + return when (mark) { + WireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) + else -> { + byteBuf.resetReaderIndex() + Left(InvalidWireFrameMarker(mark)) + } + } + } + + @Suppress("ReturnCount") + private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> { + val versionMajor = byteBuf.readUnsignedByte() + val versionMinor = byteBuf.readUnsignedByte() + byteBuf.skipBytes(RESERVED_BYTE_COUNT) + val payloadTypeRaw = byteBuf.readUnsignedShort() + val payloadSize = byteBuf.readInt() + + if (payloadSize > maxPayloadSizeBytes) { + byteBuf.resetReaderIndex() + return Left(PayloadSizeExceeded(maxPayloadSizeBytes)) + } + + if (byteBuf.readableBytes() < payloadSize) { + byteBuf.resetReaderIndex() + return Left(MissingWireFramePayloadBytes) + } + + val payload = ByteData.readFrom(byteBuf, payloadSize) + + return Right(WireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize)) + } +} diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt new file mode 100644 index 00000000..0d55cebb --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ + +sealed class WireFrameDecodingError(val message: String) + + +// Invalid frame errors + +sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg) + +class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame( + "Invalid start of frame. Expected 0x%02X, but was 0x%02X" + .format(WireFrameMessage.MARKER_BYTE, actualMarker) +) + +class PayloadSizeExceeded(maxPayloadSizeBytes: Int) : + InvalidWireFrame("payload size exceeds the limit ($maxPayloadSizeBytes bytes)") + +// Missing bytes errors + +sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg) + +object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size") +object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size") +object EmptyWireFrame : MissingWireFrameBytes("empty wire frame") diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt new file mode 100644 index 00000000..1eb6239f --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import org.onap.ves.VesEventOuterClass + +val headerRequiredFieldDescriptors = listOf( + "version", + "domain", + /* field "sequence" has been removed from validation, since default value "0" is acceptable */ + "priority", + "eventId", + "eventName", + "lastEpochMicrosec", + "startEpochMicrosec", + "reportingEntityName", + "sourceName", + "vesEventListenerVersion") + .map { fieldName -> VesEventOuterClass.CommonEventHeader.getDescriptor().findFieldByName(fieldName) } + +val vesEventListenerVersionRegex = """7\.[0-9].*""".toRegex() diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt new file mode 100644 index 00000000..de37b140 --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt @@ -0,0 +1,86 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +/** + * Wire frame structure is presented bellow using ASN.1 notation. Please note that official supported specification + * should be available on + * [RTD documentation](https://onap.readthedocs.io/en/latest/submodules/dcaegen2.git/docs/sections/apis/ves-hv.html). + * + * ``` + * -- Wire Transfer Protocol (binary, defined using ASN.1 notation) + * -- Encoding: use "direct encoding" to the number of octets indicated in the comment [n], using network byte order. + * + * WTP DEFINITIONS ::= BEGIN + * + * -- Used to sent data from the data provider + * WtpData ::= SEQUENCE { + * magic INTEGER (0..255), -- [1] always 0xAA + * versionMajor INTEGER (0..255), -- [1] major interface version, forward incompatible + * -- with previous major version, current value: 1 + * versionMinor INTEGER (0..255), -- [1] minor interface version, forward compatible + * -- with previous minor version, current value: 0 + * reserved OCTET STRING (SIZE (3)), -- [3] reserved for future use (ignored, but use 0) + * payloadId INTEGER (0..65535), -- [2] payload type: 0x0000=undefined, 0x0001=ONAP VesEvent (protobuf) + * payloadLength INTEGER (0..4294967295). -- [4] payload length in octets + * payload OCTET STRING -- [length as per payloadLength] + * } + * + * END + * ``` + * + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +data class WireFrameMessage(val payload: ByteData, + val versionMajor: Short, + val versionMinor: Short, + val payloadType: Int, + val payloadSize: Int +) { + constructor(payload: ByteArray) : this( + ByteData(payload), + SUPPORTED_VERSION_MAJOR, + SUPPORTED_VERSION_MINOR, + PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payload.size) + + fun isValid(): Boolean = + versionMajor == SUPPORTED_VERSION_MAJOR + && PayloadContentType.isValidHexValue(payloadType) + && payload.size() == payloadSize + + companion object { + const val MARKER_BYTE: Short = 0xAA + const val RESERVED_BYTE_COUNT: Int = 3 + + const val SUPPORTED_VERSION_MAJOR: Short = 1 + const val SUPPORTED_VERSION_MINOR: Short = 0 + + const val HEADER_SIZE = + 1 * java.lang.Byte.BYTES + // marker + 2 * java.lang.Byte.BYTES + // single byte fields + 1 * java.lang.Short.BYTES + // double byte fields + RESERVED_BYTE_COUNT * java.lang.Byte.BYTES + // reserved bytes + 1 * java.lang.Integer.BYTES // payload length + + const val DEFAULT_MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 + } +} diff --git a/sources/hv-collector-domain/src/main/proto/event/VesEvent.proto b/sources/hv-collector-domain/src/main/proto/event/VesEvent.proto new file mode 100644 index 00000000..6d4c2307 --- /dev/null +++ b/sources/hv-collector-domain/src/main/proto/event/VesEvent.proto @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +syntax = "proto3"; +package org.onap.ves; + +message VesEvent // top-level message, currently the maximum event size supported by the HV-VES Collector is 1 MiB +{ + CommonEventHeader commonEventHeader=1; // required + + bytes eventFields=2; // required, payload + // this field contains a domain-specific GPB message + // the field being opaque (bytes), the decoding of the payload occurs in a separate step + // the name of the GPB message for domain XYZ is XyzFields + // e.g. for domain==perf3gpp, the GPB message is Perf3gppFields +} + +// VES CommonEventHeader adapted to GPB (Google Protocol Buffers) + +message CommonEventHeader +{ + string version = 1; // required, "version of the gpb common event header", current value "1.0" + string domain = 2; // required, "the eventing domain associated with the event", allowed values: + // fault, heartbeat, measurement, mobileFlow, other, pnfRegistration, sipSignaling, + // stateChange, syslog, thresholdCrossingAlert, voiceQuality, perf3gpp + + uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed" + + enum Priority + { + PRIORITY_NOT_PROVIDED = 0; + HIGH = 1; + MEDIUM = 2; + NORMAL = 3; + LOW = 4; + } + Priority priority = 4; // required, "processing priority" + + string eventId = 5; // required, "event key that is unique to the event source" + string eventName = 6; // required, "unique event name" + string eventType = 7; // "for example - guest05, platform" + + uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" + uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" + + string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards" + string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards" + string nfVendorName = 12; // " Vendor Name providing the nf " + + bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process" + string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry" + bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process" + string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry" + string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device" + string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener", current value "7.2" + + reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources" + reserved 100; +} diff --git a/sources/hv-collector-domain/src/main/proto/measurements/README.md b/sources/hv-collector-domain/src/main/proto/measurements/README.md new file mode 100644 index 00000000..eb69eb4a --- /dev/null +++ b/sources/hv-collector-domain/src/main/proto/measurements/README.md @@ -0,0 +1 @@ +Measurements data (data placed in VesEvent.eventFields) description should be available in [RTD documentation](https://onap.readthedocs.io/en/latest/submodules/dcaegen2.git/docs/sections/apis/ves-hv.html).
\ No newline at end of file diff --git a/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt new file mode 100644 index 00000000..f17a79ba --- /dev/null +++ b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -0,0 +1,282 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import arrow.core.Either +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.ObjectAssert +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import java.nio.charset.Charset +import kotlin.test.assertTrue +import kotlin.test.fail + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +object WireFrameCodecsTest : Spek({ + val payloadAsString = "coffeebabe" + val maxPayloadSizeBytes = 1024 + val encoder = WireFrameEncoder() + val decoder = WireFrameDecoder(maxPayloadSizeBytes) + + fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset())) + + fun encodeSampleFrame() = + createSampleFrame().let { + encoder.encode(it) + } + + describe("Wire Frame invariants") { + + given("input with unsupported major version") { + val input = WireFrameMessage( + payload = ByteData.EMPTY, + versionMajor = 100, + versionMinor = 0, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = 0) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("input with unsupported minor version") { + val input = WireFrameMessage( + payload = ByteData.EMPTY, + versionMajor = 1, + versionMinor = 6, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = 0) + + it("should pass validation") { + assertThat(input.isValid()).isTrue() + } + } + + given("input with unsupported payload type") { + val input = WireFrameMessage( + payload = ByteData.EMPTY, + versionMajor = 1, + versionMinor = 0, + payloadType = 0x69, + payloadSize = 0) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("input with too small payload size") { + val input = WireFrameMessage( + payload = ByteData(byteArrayOf(1, 2, 3)), + versionMajor = 1, + versionMinor = 0, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = 1) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("input with too big payload size") { + val input = WireFrameMessage( + payload = ByteData(byteArrayOf(1, 2, 3)), + versionMajor = 1, + versionMinor = 0, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = 8) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("valid input") { + val payload = byteArrayOf(6, 9, 8, 6) + val input = WireFrameMessage( + payload = ByteData(payload), + versionMajor = 1, + versionMinor = 0, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = payload.size) + + it("should pass validation") { + assertThat(input.isValid()).isTrue() + } + } + + + } + + describe("Wire Frame codec") { + + describe("encode-decode methods' compatibility") { + val frame = createSampleFrame() + val encoded = encodeSampleFrame() + val decoded = decoder.decodeFirst(encoded).getMessageOrFail() + + it("should decode major version") { + assertThat(decoded.versionMajor).isEqualTo(frame.versionMajor) + } + + it("should decode minor version") { + assertThat(decoded.versionMinor).isEqualTo(frame.versionMinor) + } + + it("should decode payload type") { + assertThat(decoded.payloadType).isEqualTo(frame.payloadType) + } + + it("should decode payload size") { + assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize) + } + + it("should decode payload") { + assertThat(decoded.payload.asString()) + .isEqualTo(payloadAsString) + } + } + + + describe("TCP framing") { + // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11 + + it("should return error when buffer is empty") { + val buff = Unpooled.buffer() + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) } + assertBufferIntact(buff) + } + + it("should return error when given any single byte other than marker byte") { + val buff = Unpooled.buffer() + .writeByte(0xEE) + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } + assertBufferIntact(buff) + } + + it("should return error when payload message header does not fit") { + val buff = Unpooled.buffer() + .writeByte(0xAA) + .writeBytes("MOMOM".toByteArray()) + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } + assertBufferIntact(buff) + } + + it("should return error when length looks ok but first byte is not 0xAA") { + val buff = Unpooled.buffer() + .writeByte(0xFF) + .writeBytes("some garbage".toByteArray()) + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) } + assertBufferIntact(buff) + } + + it("should return error when payload doesn't fit") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + buff.writerIndex(buff.writerIndex() - 2) + + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) } + assertBufferIntact(buff) + } + + it("should decode payload message leaving rest unread") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + .writeByte(0xAB) + val decoded = decoder.decodeFirst(buff).getMessageOrFail() + + assertThat(decoded.isValid()).describedAs("should be valid").isTrue() + assertThat(buff.readableBytes()).isEqualTo(1) + } + } + + describe("payload size limit") { + + it("should decode successfully when payload size is equal 1 MiB") { + + val payload = ByteArray(maxPayloadSizeBytes) + val input = WireFrameMessage( + payload = ByteData(payload), + versionMajor = 1, + versionMinor = 0, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = payload.size) + + + assertTrue(decoder.decodeFirst(encoder.encode(input)).isRight()) + } + + it("should return error when payload exceeds 1 MiB") { + + val payload = ByteArray(maxPayloadSizeBytes + 1) + val input = WireFrameMessage( + payload = ByteData(payload), + versionMajor = 1, + versionMinor = 0, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = payload.size) + val buff = encoder.encode(input) + + decoder.decodeFirst(buff) + .assertFailedWithError { it.isInstanceOf(PayloadSizeExceeded::class.java) } + assertBufferIntact(buff) + } + + it("should validate only first message") { + + val payload = ByteArray(maxPayloadSizeBytes) + val input = WireFrameMessage( + payload = ByteData(payload), + versionMajor = 1, + versionMinor = 0, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = payload.size) + + + assertTrue(decoder.decodeFirst(encoder.encode(input).writeByte(0xAA)).isRight()) + } + } + } +}) + +private fun assertBufferIntact(buff: ByteBuf) { + assertThat(buff.refCnt()).describedAs("buffer should not be released").isEqualTo(1) + assertThat(buff.readerIndex()).describedAs("buffer reader index should be intact").isEqualTo(0) +} + +private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) -> Unit) { + fold({ assertj(assertThat(it)) }, { fail("Error expected") }) +} + +private fun Either<WireFrameDecodingError, WireFrameMessage>.getMessageOrFail(): WireFrameMessage = + fold({ fail(it.message) }, { it }) + diff --git a/sources/hv-collector-domain/src/test/resources/logback.xml b/sources/hv-collector-domain/src/test/resources/logback.xml new file mode 100644 index 00000000..0bf2cb02 --- /dev/null +++ b/sources/hv-collector-domain/src/test/resources/logback.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= + --> +<configuration> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> + <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern> + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file |