diff options
Diffstat (limited to 'components/datalake-handler/feeder/src')
2 files changed, 141 insertions, 25 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java index e9f36b2d..ce671a90 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java @@ -48,7 +48,7 @@ import org.springframework.stereotype.Service; /** * Thread that pulls messages from DMaaP and save them to Big Data DBs - * + * * @author Guobiao Mo * */ @@ -65,11 +65,11 @@ public class PullThread implements Runnable { @Autowired private ApplicationConfiguration config; - + private final Logger log = LoggerFactory.getLogger(this.getClass()); private KafkaConsumer<String, String> consumer; //<String, String> is key-value type, in our case key is empty, value is JSON text - private int id; + private int id; private final AtomicBoolean active = new AtomicBoolean(false); private boolean async; @@ -112,33 +112,34 @@ public class PullThread implements Runnable { List<String> topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop log.info("Thread {} going to subscribe to topics: {}", id, topics); - + consumer.subscribe(topics, rebalanceListener); while (active.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout())); - - List<Pair<Long, String>> messages = new ArrayList<>(records.count()); - for (TopicPartition partition : records.partitions()) { - messages.clear(); - List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); - for (ConsumerRecord<String, String> record : partitionRecords) { - messages.add(Pair.of(record.timestamp(), record.value())); - //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); - } - storeService.saveMessages(partition.topic(), messages); - log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB - - if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit - long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); - consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); - } - } - - if (async) {//for high Throughput, async commit offset in batch to Kafka - consumer.commitAsync(); - } + if (records != null) { + List<Pair<Long, String>> messages = new ArrayList<>(records.count()); + for (TopicPartition partition : records.partitions()) { + messages.clear(); + List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); + for (ConsumerRecord<String, String> record : partitionRecords) { + messages.add(Pair.of(record.timestamp(), record.value())); + //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); + } + storeService.saveMessages(partition.topic(), messages); + log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB + + if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } + + if (async) {//for high Throughput, async commit offset in batch to Kafka + consumer.commitAsync(); + } + } } } catch (Exception e) { log.error("Puller {} run(): exception={}", id, e.getMessage()); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java new file mode 100644 index 00000000..713d8b19 --- /dev/null +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java @@ -0,0 +1,115 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : DATALAKE + * ================================================================================ + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.datalake.feeder.controller; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.service.DmaapService; +import org.onap.datalake.feeder.service.PullService; +import org.onap.datalake.feeder.service.PullThread; +import org.springframework.context.ApplicationContext; + +import java.io.IOException; +import java.lang.reflect.Field; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + + +public class FeederControllerTest { + + @InjectMocks + private PullService pullService1; + + @Mock + private ApplicationConfiguration config; + + @Mock + private ApplicationContext context; + + @Mock + private DmaapService dmaapService1; + + @Mock + private KafkaConsumer<String, String> kafkaConsumer; + + @Before + public void setupTest() { + MockitoAnnotations.initMocks(this); + } + + private void setAccessPrivateFields(FeederController feederController) throws NoSuchFieldException, + IllegalAccessException { + Field pullService = feederController.getClass().getDeclaredField("pullService"); + pullService.setAccessible(true); + pullService.set(feederController, pullService1); + } + + @Test + public void testStart() throws IOException, NoSuchFieldException, IllegalAccessException { + FeederController feederController = new FeederController(); + setAccessPrivateFields(feederController); + PullService pullService2 = new PullService(); + Field applicationConfig = pullService2.getClass().getDeclaredField("config"); + applicationConfig.setAccessible(true); + applicationConfig.set(pullService2, config); + Field applicationContext = pullService2.getClass().getDeclaredField("context"); + applicationContext.setAccessible(true); + applicationContext.set(pullService2, context); + when(config.getKafkaConsumerCount()).thenReturn(1); + PullThread pullThread = new PullThread(1); + Field dmaapService = pullThread.getClass().getDeclaredField("dmaapService"); + dmaapService.setAccessible(true); + dmaapService.set(pullThread, dmaapService1); + Field kafkaConsumer1 = pullThread.getClass().getDeclaredField("consumer"); + kafkaConsumer1.setAccessible(true); + kafkaConsumer1.set(pullThread, kafkaConsumer); + applicationConfig = pullThread.getClass().getDeclaredField("config"); + applicationConfig.setAccessible(true); + applicationConfig.set(pullThread, config); + when(context.getBean(PullThread.class, 0)).thenReturn(pullThread); + ConsumerRecords<String, String> records = ConsumerRecords.empty(); + when(kafkaConsumer.poll(2)).thenReturn(records); + String start = feederController.start(); + assertEquals("DataLake feeder is running.", start); + } + + @Test + public void testStop() throws NoSuchFieldException, IllegalAccessException { + FeederController feederController = new FeederController(); + setAccessPrivateFields(feederController); + String stop = feederController.stop(); + assertEquals("DataLake feeder is stopped.", stop); + } + + @Test + public void testStatus() throws NoSuchFieldException, IllegalAccessException { + FeederController feederController = new FeederController(); + setAccessPrivateFields(feederController); + String status = feederController.status(); + assertEquals("Feeder is running: false", status); + } +} |