diff options
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
25 files changed, 0 insertions, 1368 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt deleted file mode 100644 index dd0111bc..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.boundary - -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import reactor.core.publisher.Flux - -interface Sink { - fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> -} - -interface Metrics { - fun notifyBytesReceived(size: Int) - fun notifyMessageReceived(size: Int) - fun notifyMessageSent(topic: String) -} - -@FunctionalInterface -interface SinkProvider { - operator fun invoke(config: CollectorConfiguration): Sink - - companion object { - fun just(sink: Sink): SinkProvider = - object : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink = sink - } - } -} - -interface ConfigurationProvider { - operator fun invoke(): Flux<CollectorConfiguration> -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt deleted file mode 100644 index 3c85a9b1..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.boundary - -import arrow.core.Option -import arrow.effects.IO -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.utils.ServerHandle -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono - -interface Collector { - fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> -} - -typealias CollectorProvider = () -> Option<Collector> - -interface Server { - fun start(): IO<ServerHandle> -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt deleted file mode 100644 index 5c96e1c5..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.factory - -import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.impl.Router -import org.onap.dcae.collectors.veshv.impl.VesDecoder -import org.onap.dcae.collectors.veshv.impl.VesHvCollector -import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.utils.arrow.getOption -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.util.concurrent.atomic.AtomicReference - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -class CollectorFactory(val configuration: ConfigurationProvider, - private val sinkProvider: SinkProvider, - private val metrics: Metrics, - private val maximumPayloadSizeBytes: Int, - private val healthState: HealthState = HealthState.INSTANCE) { - - fun createVesHvCollectorProvider(): CollectorProvider { - val collector: AtomicReference<Collector> = AtomicReference() - configuration() - .map(this::createVesHvCollector) - .doOnNext { - logger.info("Using updated configuration for new connections") - healthState.changeState(HealthDescription.HEALTHY) - } - .doOnError { - logger.error("Failed to acquire configuration from consul") - healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) - } - .subscribe(collector::set) - return collector::getOption - } - - private fun createVesHvCollector(config: CollectorConfiguration): Collector { - return VesHvCollector( - wireChunkDecoderSupplier = { alloc -> - WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc) - }, - protobufDecoder = VesDecoder(), - router = Router(config.routing), - sink = sinkProvider(config), - metrics = metrics) - } - - companion object { - private val logger = Logger(CollectorFactory::class) - } -} - diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt deleted file mode 100644 index dce933ab..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.factory - -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer -import org.onap.dcae.collectors.veshv.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -object ServerFactory { - fun createNettyTcpServer(serverConfiguration: ServerConfiguration, collectorProvider: CollectorProvider): Server = - NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider) -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt deleted file mode 100644 index fb949079..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors -import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventOuterClass.CommonEventHeader - -internal object MessageValidator { - - fun isValid(message: VesMessage): Boolean { - return allMandatoryFieldsArePresent(message.header) - } - - private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = - headerRequiredFieldDescriptors - .all { fieldDescriptor -> header.hasField(fieldDescriptor) } - .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion)) - -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt deleted file mode 100644 index cee658b6..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import arrow.core.Option -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.Routing -import org.onap.dcae.collectors.veshv.model.VesMessage - -class Router(private val routing: Routing) { - fun findDestination(message: VesMessage): Option<RoutedMessage> = - routing.routeFor(message.header).map { it(message) } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt deleted file mode 100644 index 1d43588f..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import arrow.core.Try -import arrow.core.Option -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventOuterClass.VesEvent - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -internal class VesDecoder { - - fun decode(bytes: ByteData): Option<VesMessage> = - Try { - val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader - VesMessage(decodedHeader, bytes) - }.toOption() -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt deleted file mode 100644 index b700f135..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import arrow.core.Option -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -internal class VesHvCollector( - private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder, - private val protobufDecoder: VesDecoder, - private val router: Router, - private val sink: Sink, - private val metrics: Metrics) : Collector { - - override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = - wireChunkDecoderSupplier(alloc).let { wireDecoder -> - dataStream - .transform { decodeWireFrame(it, wireDecoder) } - .filter(WireFrameMessage::isValid) - .transform(::decodePayload) - .filter(VesMessage::isValid) - .transform(::routeMessage) - .onErrorResume { logger.handleReactiveStreamError(it) } - .doFinally { releaseBuffersMemory(wireDecoder) } - .then() - } - - private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux - .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } - .concatMap(decoder::decode) - .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - - private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux - .map(WireFrameMessage::payload) - .map(protobufDecoder::decode) - .flatMap { omitWhenNone(it) } - - private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux - .flatMap(this::findRoute) - .compose(sink::send) - .doOnNext { metrics.notifyMessageSent(it.topic) } - - - private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg)) - - private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold( - { Mono.empty() }, - { Mono.just(it) }) - - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - - companion object { - private val logger = Logger(VesHvCollector::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt deleted file mode 100644 index 8c16736d..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams -import reactor.netty.http.client.HttpClient - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -object AdapterFactory { - fun kafkaSink(): SinkProvider = KafkaSinkProvider() - fun loggingSink(): SinkProvider = LoggingSinkProvider() - - fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = - ConsulConfigurationProvider(httpAdapter(), configurationProviderParams) - - private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt deleted file mode 100644 index ec7c60c0..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ /dev/null @@ -1,122 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.retry.Jitter -import reactor.retry.Retry -import java.io.StringReader -import java.time.Duration -import java.util.concurrent.atomic.AtomicReference -import javax.json.Json -import javax.json.JsonObject - - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since May 2018 - */ -internal class ConsulConfigurationProvider(private val http: HttpAdapter, - private val url: String, - private val firstRequestDelay: Duration, - private val requestInterval: Duration, - private val healthState: HealthState, - retrySpec: Retry<Any> - -) : ConfigurationProvider { - - private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) - private val retry = retrySpec - .doOnRetry { - logger.warn("Could not get fresh configuration", it.exception()) - healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) - } - - constructor(http: HttpAdapter, - params: ConfigurationProviderParams) : this( - http, - params.configurationUrl, - params.firstRequestDelay, - params.requestInterval, - HealthState.INSTANCE, - Retry.any<Any>() - .retryMax(MAX_RETRIES) - .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) - .jitter(Jitter.random()) - ) - - override fun invoke(): Flux<CollectorConfiguration> = - Flux.interval(firstRequestDelay, requestInterval) - .concatMap { askForConfig() } - .flatMap(::filterDifferentValues) - .map(::parseJsonResponse) - .map(::createCollectorConfiguration) - .retryWhen(retry) - - private fun askForConfig(): Mono<String> = http.get(url) - - private fun filterDifferentValues(configurationString: String) = - hashOf(configurationString).let { - if (it == lastConfigurationHash.get()) { - Mono.empty() - } else { - lastConfigurationHash.set(it) - Mono.just(configurationString) - } - } - - private fun hashOf(str: String) = str.hashCode() - - private fun parseJsonResponse(responseString: String): JsonObject = - Json.createReader(StringReader(responseString)).readObject() - - private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { - logger.info { "Obtained new configuration from consul:\n${configuration}" } - val routing = configuration.getJsonArray("collector.routing") - - return CollectorConfiguration( - kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"), - routing = org.onap.dcae.collectors.veshv.model.routing { - for (route in routing) { - val routeObj = route.asJsonObject() - defineRoute { - fromDomain(routeObj.getString("fromDomain")) - toTopic(routeObj.getString("toTopic")) - withFixedPartitioning() - } - } - }.build() - ) - } - - companion object { - private const val MAX_RETRIES = 5L - private const val BACKOFF_INTERVAL_FACTOR = 30L - private val logger = Logger(ConsulConfigurationProvider::class) - } -} - diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt deleted file mode 100644 index bdce6f73..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import io.netty.handler.codec.http.HttpStatusClass -import org.slf4j.LoggerFactory -import reactor.core.publisher.Mono -import reactor.netty.http.client.HttpClient - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since May 2018 - */ -open class HttpAdapter(private val httpClient: HttpClient) { - - private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) - - open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient - .get() - .uri(url + createQueryString(queryParams)) - .responseSingle { response, content -> - if (response.status().codeClass() == HttpStatusClass.SUCCESS) - content.asString() - else { - val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}" - Mono.error(IllegalStateException(errorMessage)) - } - } - .doOnError { - logger.error("Failed to get resource on path: $url (${it.localizedMessage})") - logger.debug("Nested exception:", it) - } - - private fun createQueryString(params: Map<String, Any>): String { - if (params.isEmpty()) - return "" - - val builder = StringBuilder("?") - params.forEach { (key, value) -> - builder - .append(key) - .append("=") - .append(value) - .append("&") - - } - - return builder.removeSuffix("&").toString() - } - -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt deleted file mode 100644 index 5f4bf354..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Flux -import java.util.concurrent.atomic.AtomicLong - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -internal class LoggingSinkProvider : SinkProvider { - - override fun invoke(config: CollectorConfiguration): Sink { - return object : Sink { - private val totalMessages = AtomicLong() - private val totalBytes = AtomicLong() - - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = - messages - .doOnNext(this::logMessage) - - private fun logMessage(msg: RoutedMessage) { - val msgs = totalMessages.addAndGet(1) - val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) - val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } - if (msgs % INFO_LOGGING_FREQ == 0L) - logger.info(logMessageSupplier) - else - logger.trace(logMessageSupplier) - } - - } - } - - companion object { - const val INFO_LOGGING_FREQ = 100_000 - private val logger = Logger(LoggingSinkProvider::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt deleted file mode 100644 index a0c22418..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ /dev/null @@ -1,82 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.VesEventOuterClass.CommonEventHeader -import reactor.core.publisher.Flux -import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderRecord -import reactor.kafka.sender.SenderResult -import java.util.concurrent.atomic.AtomicLong - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink { - private val sentMessages = AtomicLong(0) - - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { - val records = messages.map(this::vesToKafkaRecord) - val result = sender.send(records) - .doOnNext(::logException) - .filter(::isSuccessful) - .map { it.correlationMetadata() } - - return if (logger.traceEnabled) { - result.doOnNext(::logSentMessage) - } else { - result - } - } - - private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> { - return SenderRecord.create( - msg.topic, - msg.partition, - System.currentTimeMillis(), - msg.message.header, - msg.message, - msg) - } - - private fun logException(senderResult: SenderResult<out Any>) { - if (senderResult.exception() != null) { - logger.warn(senderResult.exception()) { "Failed to send message to Kafka" } - } - } - - private fun logSentMessage(sentMsg: RoutedMessage) { - logger.trace { - val msgNum = sentMessages.incrementAndGet() - "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" - } - } - - private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null - - companion object { - val logger = Logger(KafkaSink::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt deleted file mode 100644 index 18191952..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import org.apache.kafka.clients.producer.ProducerConfig -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventOuterClass.CommonEventHeader -import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderOptions - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -internal class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - return KafkaSink(KafkaSender.create(constructSenderOptions(config))) - } - - private fun constructSenderOptions(config: CollectorConfiguration) = - SenderOptions.create<CommonEventHeader, VesMessage>() - .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) - .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) - .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt deleted file mode 100644 index 4e9932cc..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import com.google.protobuf.MessageLite -import org.apache.kafka.common.serialization.Serializer - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -class ProtobufSerializer : Serializer<MessageLite> { - override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { - // no configuration - } - - override fun serialize(topic: String?, data: MessageLite?): ByteArray? = - data?.toByteArray() - - override fun close() { - // cleanup not needed - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt deleted file mode 100644 index 7a6ac7c8..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import org.apache.kafka.common.serialization.Serializer -import org.onap.dcae.collectors.veshv.model.VesMessage - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -class VesMessageSerializer : Serializer<VesMessage> { - override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { - } - - override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray() - - override fun close() { - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt deleted file mode 100644 index e535300a..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ /dev/null @@ -1,114 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.socket - -import arrow.core.getOrElse -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory -import org.onap.dcae.collectors.veshv.utils.NettyServerHandle -import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Mono -import reactor.netty.ByteBufFlux -import reactor.netty.Connection -import reactor.netty.NettyInbound -import reactor.netty.NettyOutbound -import reactor.netty.tcp.TcpServer -import java.time.Duration - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -internal class NettyTcpServer(private val serverConfig: ServerConfiguration, - private val sslContextFactory: ServerSslContextFactory, - private val collectorProvider: CollectorProvider) : Server { - - override fun start(): IO<ServerHandle> = IO { - val tcpServer = TcpServer.create() - .addressSupplier { serverConfig.serverListenAddress } - .configureSsl() - .handle(this::handleConnection) - - NettyServerHandle(tcpServer.bindNow()) - } - - private fun TcpServer.configureSsl() = - sslContextFactory - .createSslContext(serverConfig.securityConfiguration) - .map { sslContext -> - this.secure { b -> b.sslContext(sslContext) } - }.getOrElse { this } - - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = - collectorProvider().fold( - { - nettyInbound.withConnection { conn -> - logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." } - } - Mono.empty() - }, - { - nettyInbound.withConnection { conn -> - logger.info { "Handling connection from ${conn.address()}" } - conn.configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() - } - it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound)) - } - ) - - private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound - .receive() - .retain() - - private fun Connection.configureIdleTimeout(timeout: Duration): Connection { - onReadIdle(timeout.toMillis()) { - logger.info { - "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." - } - disconnectClient() - } - return this - } - - private fun Connection.disconnectClient() { - channel().close().addListener { - if (it.isSuccess) - logger.debug { "Channel (${address()}) closed successfully." } - else - logger.warn("Channel close failed", it.cause()) - } - } - - private fun Connection.logConnectionClosed(): Connection { - onTerminate().subscribe { - logger.info("Connection from ${address()} has been closed") - } - return this - } - - companion object { - private val logger = Logger(NettyTcpServer::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt deleted file mode 100644 index 4a2ef6b2..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ /dev/null @@ -1,101 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.wire - -import arrow.effects.IO -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError -import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError -import reactor.core.publisher.Flux -import reactor.core.publisher.SynchronousSink - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -internal class WireChunkDecoder( - private val decoder: WireFrameDecoder, - alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { - private val streamBuffer = alloc.compositeBuffer() - - fun release() { - streamBuffer.release() - } - - fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer { - logIncomingMessage(byteBuf) - if (byteBuf.readableBytes() == 0) { - byteBuf.release() - Flux.empty() - } else { - streamBuffer.addComponent(true, byteBuf) - generateFrames() - .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) } - .doFinally { streamBuffer.discardReadComponents() } - } - } - - private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next -> - decoder.decodeFirst(streamBuffer) - .fold(onError(next), onSuccess(next)) - .unsafeRunSync() - } - - private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err -> - when (err) { - is InvalidWireFrame -> IO { - next.error(WireFrameException(err)) - } - is MissingWireFrameBytes -> IO { - logEndOfData() - next.complete() - } - } - } - - private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame -> - IO { - logDecodedWireMessage(frame) - next.next(frame) - } - } - - private fun logIncomingMessage(wire: ByteBuf) { - logger.trace { "Got message with total size of ${wire.readableBytes()} B" } - } - - private fun logDecodedWireMessage(wire: WireFrameMessage) { - logger.trace { "Wire payload size: ${wire.payloadSize} B" } - } - - private fun logEndOfData() { - logger.trace { "End of data in current TCP buffer" } - } - - companion object { - val logger = Logger(WireChunkDecoder::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt deleted file mode 100644 index 83a7cd85..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.wire - -import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -class WireFrameException(error: WireFrameDecodingError) - : Exception("${error::class.simpleName}: ${error.message}") diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt deleted file mode 100644 index ec546c7d..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt deleted file mode 100644 index 9de34498..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import java.time.Duration - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since July 2018 - */ -data class ConfigurationProviderParams(val configurationUrl: String, - val firstRequestDelay: Duration, - val requestInterval: Duration) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt deleted file mode 100644 index 782877e3..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt +++ /dev/null @@ -1,22 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt deleted file mode 100644 index 85117684..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import java.net.InetSocketAddress -import java.time.Duration - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -data class ServerConfiguration( - val serverListenAddress: InetSocketAddress, - val configurationProviderParams: ConfigurationProviderParams, - val securityConfiguration: SecurityConfiguration, - val idleTimeout: Duration, - val healthCheckApiListenAddress: InetSocketAddress, - val maximumPayloadSizeBytes: Int, - val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt deleted file mode 100644 index f5bfcce1..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.impl.MessageValidator -import org.onap.ves.VesEventOuterClass.CommonEventHeader - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) { - fun isValid(): Boolean = MessageValidator.isValid(this) -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt deleted file mode 100644 index bab95c57..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ /dev/null @@ -1,83 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import arrow.core.Option -import org.onap.ves.VesEventOuterClass.CommonEventHeader - -data class Routing(val routes: List<Route>) { - - fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - Option.fromNullable(routes.find { it.applies(commonHeader) }) -} - -data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { - - fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain - - operator fun invoke(message: VesMessage): RoutedMessage = - RoutedMessage(targetTopic, partitioning(message.header), message) -} - - -/* -Configuration DSL - */ - -fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder { - val conf = RoutingBuilder() - conf.init() - return conf -} - -class RoutingBuilder { - private val routes: MutableList<RouteBuilder> = mutableListOf() - - fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder { - val rule = RouteBuilder() - rule.init() - routes.add(rule) - return rule - } - - fun build() = Routing(routes.map { it.build() }.toList()) -} - -class RouteBuilder { - - private lateinit var domain: String - private lateinit var targetTopic: String - private lateinit var partitioning: (CommonEventHeader) -> Int - - fun fromDomain(domain: String) { - this.domain = domain - } - - fun toTopic(targetTopic: String) { - this.targetTopic = targetTopic - } - - fun withFixedPartitioning(num: Int = 0) { - partitioning = { num } - } - - fun build() = Route(domain, targetTopic, partitioning) - -} |