diff options
Diffstat (limited to 'hv-collector-ct')
8 files changed, 486 insertions, 0 deletions
diff --git a/hv-collector-ct/pom.xml b/hv-collector-ct/pom.xml new file mode 100644 index 00000000..6ad304f7 --- /dev/null +++ b/hv-collector-ct/pom.xml @@ -0,0 +1,100 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <parent> + <groupId>org.onap.dcaegen2.collectors.veshv</groupId> + <artifactId>ves-hv-collector</artifactId> + <version>1.0.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>hv-collector-ct</artifactId> + <description>VES HighVolume Collector :: Component tests</description> + + <properties> + <failIfMissingUnitTests>false</failIfMissingUnitTests> + <failIfMissingComponentTests>true</failIfMissingComponentTests> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-core</artifactId> + <version>${project.parent.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-test</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + </dependency> + </dependencies> + + +</project>
\ No newline at end of file diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt new file mode 100644 index 00000000..f8eefa5a --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink +import reactor.core.publisher.Flux +import java.time.Duration + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class Sut { + val configurationProvider = FakeConfigurationProvider() + val sink = FakeSink() + private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink)) + val collectorProvider = collectorFactory.createVesHvCollectorProvider() + + val collector: Collector + get() = collectorProvider() + + fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> { + collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + + return sink.sentMessages + } +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt new file mode 100644 index 00000000..2cfb785e --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -0,0 +1,94 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object VesHvSpecification : Spek({ + describe("VES High Volume Collector") { + system("should handle multiple HV RAN events") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS)) + + assertThat(messages) + .describedAs("should send all events") + .hasSize(2) + } + + system("should release memory for each incoming message") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val msgWithInvalidDomain = vesMessage(Domain.OTHER) + val msgWithInvalidPayload = invalidVesMessage() + val msgWithInvalidFrame = invalidWireFrame() + val validMessage = vesMessage(Domain.HVRANMEAS) + + sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage) + + assertThat(msgWithInvalidDomain.refCnt()) + .describedAs("message with invalid domain should be released") + .isEqualTo(0) + assertThat(msgWithInvalidPayload.refCnt()) + .describedAs("message with invalid payload should be released") + .isEqualTo(0) + assertThat(msgWithInvalidFrame.refCnt()) + .describedAs("message with invalid frame should be released") + .isEqualTo(0) + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(0) + } + } + + describe("message routing") { + system("should direct message to a topic by means of routing configuration") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS)) + assertThat(messages).describedAs("number of routed messages").hasSize(1) + + val msg = messages[0] + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC) + assertThat(msg.partition).describedAs("routed message partition").isEqualTo(1) + } + + system("should drop message if route was not found") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val messages = sut.handleConnection( + vesMessage(Domain.OTHER, "first"), + vesMessage(Domain.HVRANMEAS, "second"), + vesMessage(Domain.HEARTBEAT, "third")) + + assertThat(messages).describedAs("number of routed messages").hasSize(1) + + val msg = messages[0] + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC) + assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") + } + } +}) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt new file mode 100644 index 00000000..c30ba7e1 --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import org.jetbrains.spek.api.dsl.Pending +import org.jetbrains.spek.api.dsl.TestContainer + +internal fun TestContainer.system(description: String, body: (Sut) -> Unit) { + test("system $description", body = { body(Sut()) }) +} + +internal fun TestContainer.xsystem(description: String, reason: String? = null, body: (Sut) -> Unit = {}) { + test("system $description", Pending.Yes(reason), body = { body(Sut()) }) +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt new file mode 100644 index 00000000..e4958fdb --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import com.google.protobuf.ByteString +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.PooledByteBufAllocator +import io.netty.buffer.Unpooled +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import java.nio.charset.Charset +import java.util.* + +val alocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT + +fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = alocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(1) // major version + writeByte(0) // minor version + + val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer() + writeInt(gpb.limit()) // ves event size in bytes + writeBytes(gpb) // ves event as GPB bytes +} + + +fun invalidVesMessage() = alocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(1) // major version + writeByte(0) // minor version + + val invalidGpb = "some random data".toByteArray(Charsets.UTF_8) + writeInt(invalidGpb.size) // ves event size in bytes + writeBytes(invalidGpb) + +} + +fun invalidWireFrame() = alocator.buffer().run { + writeByte(0xFF) + writeByte(1) + writeByte(0) +} + +fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = + VesEvent.newBuilder() + .setCommonEventHeader( + CommonEventHeader.getDefaultInstance().toBuilder() + .setVersion("1.0") + .setEventId(id) + .setDomain(domain) + .setEventName("Sample event name") + .setSourceName("Sample Source") + .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String")) + .setPriority(CommonEventHeader.Priority.MEDIUM) + .setStartEpochMicrosec(120034455) + .setLastEpochMicrosec(120034459) + .setSequence(1)) + .setHvRanMeasFields(ByteString.EMPTY) + .build() + + + diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt new file mode 100644 index 00000000..728e0c5c --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.routing +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import reactor.core.publisher.FluxProcessor +import reactor.core.publisher.UnicastProcessor + + +const val HVRANMEAS_TOPIC = "ves_hvRanMeas" + +val basicConfiguration: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + defineRoute { + fromDomain(Domain.HVRANMEAS) + toTopic(HVRANMEAS_TOPIC) + withFixedPartitioning() + } + }.build() +) + + +class FakeConfigurationProvider : ConfigurationProvider { + private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() + + fun updateConfiguration(collectorConfiguration: CollectorConfiguration) { + configStream.onNext(collectorConfiguration) + } + + override fun invoke() = configStream +}
\ No newline at end of file diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt new file mode 100644 index 00000000..7da72151 --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -0,0 +1,43 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux +import java.util.* +import java.util.concurrent.ConcurrentLinkedDeque + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class FakeSink : Sink { + private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque() + + val sentMessages: List<RoutedMessage> + get() = sent.toList() + + override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> { + return messages.doOnNext(sent::addLast).map { it.message } + } +} diff --git a/hv-collector-ct/src/test/resources/logback-test.xml b/hv-collector-ct/src/test/resources/logback-test.xml new file mode 100644 index 00000000..809f62d4 --- /dev/null +++ b/hv-collector-ct/src/test/resources/logback-test.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> + <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern> + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file |