aboutsummaryrefslogtreecommitdiffstats
path: root/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java
diff options
context:
space:
mode:
Diffstat (limited to 'netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java')
-rw-r--r--netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java26
1 files changed, 13 insertions, 13 deletions
diff --git a/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java b/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java
index 5fddff5..1b99220 100644
--- a/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java
+++ b/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -48,33 +48,33 @@ public class StoreService {
this.consumerFactory = consumerFactory;
}
- List<MessageDTO> getAllMessages() {
- List<MessageDTO> messages = new ArrayList<>();
- String clientID = Long.toString(Instant.now().getEpochSecond());
- try (Consumer<String, String> consumer = consumerFactory.createConsumer(clientID, clientID)) {
+ List<Message> getAllMessages() {
+ List<Message> messages = new ArrayList<>();
+ String clientId = Long.toString(Instant.now().getEpochSecond());
+ try (Consumer<String, String> consumer = consumerFactory.createConsumer(clientId, clientId)) {
consumer.subscribe(TOPICS_TO_SUBSCRIBE);
ConsumerRecords<String, String> consumerRecords = consumer.poll(CONSUMING_DURATION_IN_MS);
consumerRecords.forEach(
consumerRecord ->
- messages.add(new MessageDTO(consumerRecord.timestamp(), consumerRecord.value())));
+ messages.add(new Message(consumerRecord.timestamp(), consumerRecord.value())));
log.debug(String.format("consumed %d messages", consumerRecords.count()));
- }
+ }
return messages;
}
- List<MessageDTO> getLastMessages(long offset) {
- List<MessageDTO> messages = new ArrayList<>();
+ List<Message> getLastMessages(long offset) {
+ List<Message> messages = new ArrayList<>();
try (Consumer<String, String> consumer = createConsumer(offset)) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(CONSUMING_DURATION_IN_MS);
consumerRecords.forEach(consumerRecord ->
- messages.add(new MessageDTO(consumerRecord.timestamp(), consumerRecord.value())));
+ messages.add(new Message(consumerRecord.timestamp(), consumerRecord.value())));
}
return messages;
}
private Consumer<String, String> createConsumer(long offsetFromLastIndex) {
- String clientID = Long.toString(Instant.now().getEpochSecond());
- Consumer<String, String> consumer = consumerFactory.createConsumer(clientID, clientID);
+ String clientId = Long.toString(Instant.now().getEpochSecond());
+ Consumer<String, String> consumer = consumerFactory.createConsumer(clientId, clientId);
consumer.subscribe(TOPICS_TO_SUBSCRIBE);
seekConsumerTo(consumer, offsetFromLastIndex);
return consumer;