diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-04-26 09:17:09 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-07-26 09:18:00 +0200 |
commit | e98fdcc3087d06b76066ae2d2c7d0bde41d7776b (patch) | |
tree | 3f6b79be2422022233b7e2f6c51064a63cba5fe1 /hv-collector-ct/src/test | |
parent | dcbb6333fede6c0cf43ac8690119911b01864d8d (diff) |
HV VES Collector seed code
Contains squashed commits up to 11fe6b63 (2018-05-30). The whole
contains a basic project structure. We are trying to put rest of the
commits one by one so we do not loose the history.
Bellow there are messages of the single commits in this squashed bulk:
Basic project setup
Create base maven project with Gitlab CI configuration.
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Merging guildeline
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Add remote branch delete command
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Sample runtime in Kotlin - PoC
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Setup project internal architecture
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Message routing
Determine target topic and partition by VES Common Header.
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Parse GPB message header
fkrzywka <filip.krzywka@nokia.com>
Set listen port based on command line args
Use Apache Commons CLI to parse cmd line args.
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Drop invalid GPB messages
Instead of propagating error and closing stream just drop the message
and proceed. Final handling logic may include closing the connection or
sending some message depending on the specification.
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Add Apache license file
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Convert to maven multi-module project
fkrzywka <filip.krzywka@nokia.com>
Component tests with current GPB schema
* Using v5 draft protobuf definition
* Code reorganized to so component boundaries are more visible
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thin logging facade over slf4j
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Introduce code analysis tools
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Implemented reading configuration from consul
Ves Common Header validation added (required parameters existance check)
Micro benchmark for direct vs on-heap NIO buffers
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Decode wire protocol and fix (most?) memory leaks
Proposed wire protocol is just a suggestion and will (should) change
in the future.
Netty's ByteBuf is a reference-counted wrapper over a memory chunk. It
is crucial to free unused buffers by means of release() method.
The general rule regarding memory management was suggested. Let's put
all memory-cleanup logic in main VesHvCollector class so other classes
could focus on their job.
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Minor cleanup
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Add license info in files
Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Change-Id: Ic484aa107eba48ad48f8ab222799e1795dffa865
Issue-ID: DCAEGEN2-601
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-ct/src/test')
7 files changed, 386 insertions, 0 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt new file mode 100644 index 00000000..f8eefa5a --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink +import reactor.core.publisher.Flux +import java.time.Duration + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class Sut { + val configurationProvider = FakeConfigurationProvider() + val sink = FakeSink() + private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink)) + val collectorProvider = collectorFactory.createVesHvCollectorProvider() + + val collector: Collector + get() = collectorProvider() + + fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> { + collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + + return sink.sentMessages + } +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt new file mode 100644 index 00000000..2cfb785e --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -0,0 +1,94 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object VesHvSpecification : Spek({ + describe("VES High Volume Collector") { + system("should handle multiple HV RAN events") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS)) + + assertThat(messages) + .describedAs("should send all events") + .hasSize(2) + } + + system("should release memory for each incoming message") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val msgWithInvalidDomain = vesMessage(Domain.OTHER) + val msgWithInvalidPayload = invalidVesMessage() + val msgWithInvalidFrame = invalidWireFrame() + val validMessage = vesMessage(Domain.HVRANMEAS) + + sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage) + + assertThat(msgWithInvalidDomain.refCnt()) + .describedAs("message with invalid domain should be released") + .isEqualTo(0) + assertThat(msgWithInvalidPayload.refCnt()) + .describedAs("message with invalid payload should be released") + .isEqualTo(0) + assertThat(msgWithInvalidFrame.refCnt()) + .describedAs("message with invalid frame should be released") + .isEqualTo(0) + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(0) + } + } + + describe("message routing") { + system("should direct message to a topic by means of routing configuration") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS)) + assertThat(messages).describedAs("number of routed messages").hasSize(1) + + val msg = messages[0] + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC) + assertThat(msg.partition).describedAs("routed message partition").isEqualTo(1) + } + + system("should drop message if route was not found") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val messages = sut.handleConnection( + vesMessage(Domain.OTHER, "first"), + vesMessage(Domain.HVRANMEAS, "second"), + vesMessage(Domain.HEARTBEAT, "third")) + + assertThat(messages).describedAs("number of routed messages").hasSize(1) + + val msg = messages[0] + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC) + assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") + } + } +}) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt new file mode 100644 index 00000000..c30ba7e1 --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import org.jetbrains.spek.api.dsl.Pending +import org.jetbrains.spek.api.dsl.TestContainer + +internal fun TestContainer.system(description: String, body: (Sut) -> Unit) { + test("system $description", body = { body(Sut()) }) +} + +internal fun TestContainer.xsystem(description: String, reason: String? = null, body: (Sut) -> Unit = {}) { + test("system $description", Pending.Yes(reason), body = { body(Sut()) }) +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt new file mode 100644 index 00000000..e4958fdb --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import com.google.protobuf.ByteString +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.PooledByteBufAllocator +import io.netty.buffer.Unpooled +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import java.nio.charset.Charset +import java.util.* + +val alocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT + +fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = alocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(1) // major version + writeByte(0) // minor version + + val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer() + writeInt(gpb.limit()) // ves event size in bytes + writeBytes(gpb) // ves event as GPB bytes +} + + +fun invalidVesMessage() = alocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(1) // major version + writeByte(0) // minor version + + val invalidGpb = "some random data".toByteArray(Charsets.UTF_8) + writeInt(invalidGpb.size) // ves event size in bytes + writeBytes(invalidGpb) + +} + +fun invalidWireFrame() = alocator.buffer().run { + writeByte(0xFF) + writeByte(1) + writeByte(0) +} + +fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = + VesEvent.newBuilder() + .setCommonEventHeader( + CommonEventHeader.getDefaultInstance().toBuilder() + .setVersion("1.0") + .setEventId(id) + .setDomain(domain) + .setEventName("Sample event name") + .setSourceName("Sample Source") + .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String")) + .setPriority(CommonEventHeader.Priority.MEDIUM) + .setStartEpochMicrosec(120034455) + .setLastEpochMicrosec(120034459) + .setSequence(1)) + .setHvRanMeasFields(ByteString.EMPTY) + .build() + + + diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt new file mode 100644 index 00000000..728e0c5c --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.routing +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import reactor.core.publisher.FluxProcessor +import reactor.core.publisher.UnicastProcessor + + +const val HVRANMEAS_TOPIC = "ves_hvRanMeas" + +val basicConfiguration: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + defineRoute { + fromDomain(Domain.HVRANMEAS) + toTopic(HVRANMEAS_TOPIC) + withFixedPartitioning() + } + }.build() +) + + +class FakeConfigurationProvider : ConfigurationProvider { + private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() + + fun updateConfiguration(collectorConfiguration: CollectorConfiguration) { + configStream.onNext(collectorConfiguration) + } + + override fun invoke() = configStream +}
\ No newline at end of file diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt new file mode 100644 index 00000000..7da72151 --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -0,0 +1,43 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux +import java.util.* +import java.util.concurrent.ConcurrentLinkedDeque + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class FakeSink : Sink { + private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque() + + val sentMessages: List<RoutedMessage> + get() = sent.toList() + + override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> { + return messages.doOnNext(sent::addLast).map { it.message } + } +} diff --git a/hv-collector-ct/src/test/resources/logback-test.xml b/hv-collector-ct/src/test/resources/logback-test.xml new file mode 100644 index 00000000..809f62d4 --- /dev/null +++ b/hv-collector-ct/src/test/resources/logback-test.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> + <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern> + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file |