aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-dcae-app-simulator/src/main
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-28 15:46:50 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-29 14:41:42 +0100
commitdde383a2aa75f94c26d7949665b79cc95486a223 (patch)
tree75f3e8f564067afd0e67dbe6254183e45ca26944 /hv-collector-dcae-app-simulator/src/main
parent77f896523f2065b1da1be21545155a29edea5122 (diff)
Custom detekt rule for logger usage check
Check if logger invocations don't use unoptimal invocations, eg. concatenation `debug("a=" + a)` instead of lambda use `debug {"a=" + a}` Unfortunately to avoid defining dependencies in many places and having circural dependencies it was necessarry to reorganize the maven module structure. The goal was to have `sources` module with production code and `build` module with build-time tooling (detekt rules among them). Issue-ID: DCAEGEN2-1002 Change-Id: I36e677b98972aaae6905d722597cbce5e863d201 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-dcae-app-simulator/src/main')
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt74
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt88
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt101
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt66
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt67
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt27
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt70
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt62
-rw-r--r--hv-collector-dcae-app-simulator/src/main/resources/logback.xml36
9 files changed, 0 insertions, 591 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
deleted file mode 100644
index 490cde4a..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-
-import arrow.core.getOrElse
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.bindingCatch
-import org.onap.dcae.collectors.veshv.utils.arrow.getOption
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.io.InputStream
-import java.util.concurrent.atomic.AtomicReference
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since August 2018
- */
-class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
- private val messageStreamValidation: MessageStreamValidation) {
- private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
-
- fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
-
- fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch {
- if (topics.any { it.isBlank() })
- throw IllegalArgumentException("Topic list cannot contain empty elements")
- if (topics.isEmpty())
- throw IllegalArgumentException("Topic list cannot be empty")
-
- logger.info("Received new configuration. Creating consumer for topics: $topics")
- consumerState.set(consumerFactory.createConsumerForTopics(topics).bind())
- }.fix()
-
- fun state() = consumerState.getOption().map { it.currentState() }
-
- fun resetState(): IO<Unit> = consumerState.getOption().fold(
- { IO.unit },
- { it.reset() }
- )
-
- fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages())
-
- private fun currentMessages(): List<ByteArray> =
- consumerState.getOption()
- .map { it.currentState().consumedMessages }
- .getOrElse(::emptyList)
-
- private fun extractTopics(topicsString: String): Set<String> =
- topicsString.substringAfter("=")
- .split(",")
- .toSet()
-
- companion object {
- private val logger = Logger(DcaeAppSimulator::class)
- }
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
deleted file mode 100644
index e423191d..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.bindingCatch
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.utils.arrow.asIo
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventOuterClass
-import java.io.InputStream
-import javax.json.Json
-
-class MessageStreamValidation(
- private val messageGenerator: MessageGenerator,
- private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
-
- fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
- IO.monadError().bindingCatch {
- val messageParams = parseMessageParams(jsonDescription)
- val expectedEvents = generateEvents(messageParams).bind()
- val actualEvents = decodeConsumedEvents(consumedMessages)
- if (shouldValidatePayloads(messageParams)) {
- expectedEvents == actualEvents
- } else {
- validateHeaders(actualEvents, expectedEvents)
- }
- }.fix()
-
- private fun parseMessageParams(input: InputStream): List<MessageParameters> {
- val expectations = Json.createReader(input).readArray()
- val messageParams = messageParametersParser.parse(expectations)
-
- return messageParams.fold(
- { throw IllegalArgumentException("Parsing error: " + it.message) },
- {
- if (it.isEmpty())
- throw IllegalArgumentException("Message param list cannot be empty")
- it
- }
- )
- }
-
- private fun shouldValidatePayloads(parameters: List<MessageParameters>) =
- parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
-
- private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>,
- expected: List<VesEventOuterClass.VesEvent>): Boolean {
- val consumedHeaders = actual.map { it.commonEventHeader }
- val generatedHeaders = expected.map { it.commonEventHeader }
- return generatedHeaders == consumedHeaders
- }
-
- private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> =
- messageGenerator.createMessageFlux(parameters)
- .map(WireFrameMessage::payload)
- .map(ByteData::unsafeAsArray)
- .map(VesEventOuterClass.VesEvent::parseFrom)
- .collectList()
- .asIo()
-
- private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
- consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom)
-
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
deleted file mode 100644
index 1eca9317..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
-import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
-import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
-import org.onap.dcae.collectors.veshv.utils.http.Responses
-import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
-import org.onap.dcae.collectors.veshv.utils.http.sendOrError
-import ratpack.handling.Chain
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
- private val responseValid by lazy {
- Responses.statusResponse(
- name = "valid",
- message = "validation succeeded"
- )
- }
-
- private val responseInvalid by lazy {
- Responses.statusResponse(
- name = "invalid",
- message = "validation failed",
- httpStatus = HttpStatus.BAD_REQUEST
- )
- }
-
-
- fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
- simulator.listenToTopics(kafkaTopics).map {
- RatpackServer.start { server ->
- server.serverConfig(ServerConfig.embedded().port(port))
- .handlers(::setupHandlers)
- }
- }
-
- private fun setupHandlers(chain: Chain) {
- chain
- .put("configuration/topics") { ctx ->
- ctx.request.body.then { body ->
- val operation = simulator.listenToTopics(body.text)
- ctx.response.sendOrError(operation)
- }
-
- }
- .delete("messages") { ctx ->
- ctx.response.contentType(CONTENT_TEXT)
- ctx.response.sendOrError(simulator.resetState())
- }
- .get("messages/all/count") { ctx ->
- simulator.state().fold(
- { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) },
- {
- ctx.response
- .contentType(CONTENT_TEXT)
- .send(it.messagesCount.toString())
- })
- }
- .post("messages/all/validate") { ctx ->
- ctx.request.body.then { body ->
- val response = simulator.validate(body.inputStream)
- .map { isValid ->
- if (isValid) responseValid else responseInvalid
- }
- ctx.response.sendAndHandleErrors(response)
- }
- }
- .get("healthcheck") { ctx ->
- ctx.response.status(HttpConstants.STATUS_OK).send()
- }
- }
-
- companion object {
- private const val CONTENT_TEXT = "text/plain"
- }
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
deleted file mode 100644
index 10dedbdf..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
-
-import arrow.effects.IO
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer
-import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.kafka.receiver.KafkaReceiver
-import reactor.kafka.receiver.ReceiverOptions
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
-
- fun start(): IO<Consumer> = IO {
- val consumer = Consumer()
- receiver.receive().map(consumer::update).evaluateIo().subscribe()
- consumer
- }
-
- companion object {
- private val logger = Logger(KafkaSource::class)
-
- fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
- return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
- }
-
- fun createReceiverOptions(bootstrapServers: String,
- topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
- val props = mapOf<String, Any>(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
- ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator",
- ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators",
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
- )
- return ReceiverOptions.create<ByteArray, ByteArray>(props)
- .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
- .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
- .subscription(topics)
- }
- }
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
deleted file mode 100644
index 17eeb5b1..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* dcaegen2-collectors-veshv
-* ================================================================================
-* Copyright (C) 2018 NOKIA
-* ================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
-
-import arrow.core.Option
-import arrow.core.fix
-import arrow.instances.option.monad.monad
-import arrow.typeclasses.binding
-import org.apache.commons.cli.CommandLine
-import org.apache.commons.cli.DefaultParser
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
-import org.onap.dcae.collectors.veshv.utils.commandline.intValue
-import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
-
-class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
- override val cmdLineOptionsList: List<CommandLineOption> = listOf(
- LISTEN_PORT,
- MAXIMUM_PAYLOAD_SIZE_BYTES,
- KAFKA_SERVERS,
- KAFKA_TOPICS
- )
-
- override fun getConfiguration(cmdLine: CommandLine): Option<DcaeAppSimConfiguration> =
- Option.monad().binding {
- val listenPort = cmdLine
- .intValue(LISTEN_PORT)
- .bind()
- val maxPayloadSizeBytes = cmdLine
- .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
- val kafkaBootstrapServers = cmdLine
- .stringValue(KAFKA_SERVERS)
- .bind()
- val kafkaTopics = cmdLine
- .stringValue(KAFKA_TOPICS)
- .map { it.split(",").toSet() }
- .bind()
-
- DcaeAppSimConfiguration(
- listenPort,
- maxPayloadSizeBytes,
- kafkaBootstrapServers,
- kafkaTopics)
- }.fix()
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
deleted file mode 100644
index a6fc8053..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
-
-data class DcaeAppSimConfiguration(
- val apiPort: Int,
- val maxPayloadSizeBytes: Int,
- val kafkaBootstrapServers: String,
- val kafkaTopics: Set<String>
-)
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
deleted file mode 100644
index 1eefdbdb..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.kafka.receiver.ReceiverRecord
-import java.util.concurrent.ConcurrentLinkedQueue
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) {
- val messagesCount: Int by lazy {
- messages.size
- }
-
- val consumedMessages: List<ByteArray> by lazy {
- messages.toList()
- }
-}
-
-interface ConsumerStateProvider {
- fun currentState(): ConsumerState
- fun reset(): IO<Unit>
-}
-
-class Consumer : ConsumerStateProvider {
-
- private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
-
- override fun currentState(): ConsumerState = ConsumerState(consumedMessages)
-
- override fun reset(): IO<Unit> = IO {
- consumedMessages.clear()
- }
-
- fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> {
- logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
- consumedMessages.add(record.value())
- }
-
- companion object {
- private val logger = Logger(Consumer::class)
- }
-}
-
-class ConsumerFactory(private val kafkaBootstrapServers: String) {
- fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> =
- KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
deleted file mode 100644
index 06ff4d59..00000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.unit
-import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
-
-private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
-private val logger = Logger(PACKAGE_NAME)
-const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
-
-fun main(args: Array<String>) =
- ArgDcaeAppSimConfiguration().parse(args)
- .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startApp)
- .unsafeRunEitherSync(
- { ex ->
- logger.error("Failed to start a server", ex)
- ExitFailure(1)
- },
- {
- logger.info("Started DCAE-APP Simulator API server")
- }
- )
-
-
-private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
- logger.info("Using configuration: $config")
- val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
- val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
- return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
- .start(config.apiPort, config.kafkaTopics)
- .unit()
-}
diff --git a/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
deleted file mode 100644
index 48da3b18..00000000
--- a/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<configuration>
- <property name="LOG_FILE"
- value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
- <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
-
- <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>
- %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
- </pattern>
- </encoder>
- </appender>
-
- <appender name="ROLLING-FILE"
- class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder>
- <pattern>${FILE_LOG_PATTERN}</pattern>
- </encoder>
- <file>${LOG_FILE}</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
- <maxFileSize>50MB</maxFileSize>
- <maxHistory>30</maxHistory>
- <totalSizeCap>10GB</totalSizeCap>
- </rollingPolicy>
- </appender>
-
- <logger name="org.onap.dcae.collectors.veshv" level="INFO"/>
- <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
-
- <root level="INFO">
- <appender-ref ref="CONSOLE"/>
- <appender-ref ref="ROLLING-FILE"/>
- </root>
-</configuration> \ No newline at end of file