diff options
Diffstat (limited to 'test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka')
7 files changed, 0 insertions, 391 deletions
diff --git a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/Config.java b/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/Config.java deleted file mode 100644 index 9ae564103..000000000 --- a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/Config.java +++ /dev/null @@ -1,70 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Simulator - * ================================================================================ - * Copyright (C) 2019 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.netconfsimulator.kafka; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.onap.netconfsimulator.kafka.listener.KafkaListenerHandler; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; - -@Configuration -@EnableKafka -class Config { - - @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServer; - - @Value("${spring.kafka.consumer.auto-offset-reset}") - private String offsetReset; - - @Bean - ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) { - ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>(); - containerFactory.setConsumerFactory(consumerFactory); - return containerFactory; - } - - @Bean - ConsumerFactory<String, String> consumerFactory() { - Map<String, Object> props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); - return new DefaultKafkaConsumerFactory<>(props); - } - - - @Bean - KafkaListenerHandler kafkaListenerHandler(ConsumerFactory<String, String> consumerFactory) { - return new KafkaListenerHandler(consumerFactory); - } - -} diff --git a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/MessageDTO.java b/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/MessageDTO.java deleted file mode 100644 index 4311cd61f..000000000 --- a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/MessageDTO.java +++ /dev/null @@ -1,31 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Simulator - * ================================================================================ - * Copyright (C) 2019 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.netconfsimulator.kafka; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -@Getter -@AllArgsConstructor -class MessageDTO { - private long timestamp; - private String configuration; -} diff --git a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreController.java b/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreController.java deleted file mode 100644 index 33bbdf7cf..000000000 --- a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreController.java +++ /dev/null @@ -1,59 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Simulator - * ================================================================================ - * Copyright (C) 2019 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.netconfsimulator.kafka; - -import java.util.List; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - -@RestController -@Slf4j -@RequestMapping("/store") -public class StoreController { - - private StoreService service; - - @Autowired - public StoreController(StoreService service) { - this.service = service; - } - - @GetMapping("/ping") - String ping() { - return "pong"; - } - - @GetMapping("cm-history") - List<MessageDTO> getAllConfigurationChanges() { - return service.getAllMessages(); - } - - @GetMapping("/less") - List<MessageDTO> less(@RequestParam(value = "offset", required = false, defaultValue = "${spring.kafka.default-offset}") long offset) { - return service.getLastMessages(offset); - } - -} diff --git a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java b/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java deleted file mode 100644 index 5fddff5a2..000000000 --- a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java +++ /dev/null @@ -1,91 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Simulator - * ================================================================================ - * Copyright (C) 2019 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.netconfsimulator.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.TopicPartition; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.stereotype.Service; - -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -@Slf4j -@Service -public class StoreService { - - private static final String CONFIG_TOPIC = "config"; - private static final long CONSUMING_DURATION_IN_MS = 1000; - - private ConsumerFactory<String, String> consumerFactory; - static final List<String> TOPICS_TO_SUBSCRIBE = Collections.singletonList(CONFIG_TOPIC); - - @Autowired - StoreService(ConsumerFactory<String, String> consumerFactory) { - this.consumerFactory = consumerFactory; - } - - List<MessageDTO> getAllMessages() { - List<MessageDTO> messages = new ArrayList<>(); - String clientID = Long.toString(Instant.now().getEpochSecond()); - try (Consumer<String, String> consumer = consumerFactory.createConsumer(clientID, clientID)) { - consumer.subscribe(TOPICS_TO_SUBSCRIBE); - ConsumerRecords<String, String> consumerRecords = consumer.poll(CONSUMING_DURATION_IN_MS); - consumerRecords.forEach( - consumerRecord -> - messages.add(new MessageDTO(consumerRecord.timestamp(), consumerRecord.value()))); - log.debug(String.format("consumed %d messages", consumerRecords.count())); - } - return messages; - } - - List<MessageDTO> getLastMessages(long offset) { - List<MessageDTO> messages = new ArrayList<>(); - try (Consumer<String, String> consumer = createConsumer(offset)) { - ConsumerRecords<String, String> consumerRecords = consumer.poll(CONSUMING_DURATION_IN_MS); - consumerRecords.forEach(consumerRecord -> - messages.add(new MessageDTO(consumerRecord.timestamp(), consumerRecord.value()))); - } - return messages; - } - - private Consumer<String, String> createConsumer(long offsetFromLastIndex) { - String clientID = Long.toString(Instant.now().getEpochSecond()); - Consumer<String, String> consumer = consumerFactory.createConsumer(clientID, clientID); - consumer.subscribe(TOPICS_TO_SUBSCRIBE); - seekConsumerTo(consumer, offsetFromLastIndex); - return consumer; - } - - private void seekConsumerTo(Consumer<String, String> consumer, long offsetFromLastIndex) { - consumer.seekToEnd(consumer.assignment()); - consumer.poll(CONSUMING_DURATION_IN_MS); - TopicPartition topicPartition = consumer.assignment().iterator().next(); - long topicCurrentSize = consumer.position(topicPartition); - long indexToSeek = offsetFromLastIndex > topicCurrentSize ? 0 : topicCurrentSize - offsetFromLastIndex; - consumer.seek(topicPartition, indexToSeek); - } -} diff --git a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/listener/KafkaListenerEntry.java b/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/listener/KafkaListenerEntry.java deleted file mode 100644 index e3c04c9fc..000000000 --- a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/listener/KafkaListenerEntry.java +++ /dev/null @@ -1,36 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Simulator - * ================================================================================ - * Copyright (C) 2019 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.netconfsimulator.kafka.listener; - -import lombok.Getter; -import org.springframework.kafka.listener.AbstractMessageListenerContainer; - -@Getter -public class KafkaListenerEntry { - - private String clientId; - private AbstractMessageListenerContainer listenerContainer; - - public KafkaListenerEntry(String clientId, AbstractMessageListenerContainer listenerContainer) { - this.clientId = clientId; - this.listenerContainer = listenerContainer; - } -} diff --git a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/listener/KafkaListenerHandler.java b/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/listener/KafkaListenerHandler.java deleted file mode 100644 index 604315d5f..000000000 --- a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/listener/KafkaListenerHandler.java +++ /dev/null @@ -1,67 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Simulator - * ================================================================================ - * Copyright (C) 2019 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.netconfsimulator.kafka.listener; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.ConsumerFactory; - -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.KafkaMessageListenerContainer; -import org.springframework.kafka.listener.MessageListener; - - -import org.springframework.kafka.support.TopicPartitionInitialOffset; - -import java.time.Instant; - -public class KafkaListenerHandler { - - private static final int PARTITION = 0; - private static final long NUMBER_OF_HISTORICAL_MESSAGES_TO_SHOW = -10L; - private static final boolean RELATIVE_TO_CURRENT = false; - private ConsumerFactory<String, String> consumerFactory; - - - @Autowired - public KafkaListenerHandler(ConsumerFactory<String, String> consumerFactory) { - this.consumerFactory = consumerFactory; - } - - - public KafkaListenerEntry createKafkaListener(MessageListener messageListener, String topicName) { - String clientId = Long.toString(Instant.now().getEpochSecond()); - ContainerProperties containerProperties = new ContainerProperties(topicName); - containerProperties.setGroupId(clientId); - KafkaMessageListenerContainer<String, String> listenerContainer = createListenerContainer(containerProperties, - topicName); - - listenerContainer.setupMessageListener(messageListener); - return new KafkaListenerEntry(clientId, listenerContainer); - } - - - KafkaMessageListenerContainer<String, String> createListenerContainer(ContainerProperties containerProperties, - String topicName) { - TopicPartitionInitialOffset config = new TopicPartitionInitialOffset(topicName, PARTITION, - NUMBER_OF_HISTORICAL_MESSAGES_TO_SHOW, RELATIVE_TO_CURRENT); - return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties, config); - } -} diff --git a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/model/KafkaMessage.java b/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/model/KafkaMessage.java deleted file mode 100644 index 90f283acf..000000000 --- a/test/mocks/pnfsimulator/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/model/KafkaMessage.java +++ /dev/null @@ -1,37 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Simulator - * ================================================================================ - * Copyright (C) 2019 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.netconfsimulator.kafka.model; - -import lombok.Getter; - -@Getter -public class KafkaMessage { - private long timestamp; - private String configuration; - - public KafkaMessage(long timestamp, String configuration) { - this.timestamp = timestamp; - this.configuration = configuration; - } - - KafkaMessage() { - } -} |