summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src
diff options
context:
space:
mode:
Diffstat (limited to 'components/datalake-handler/feeder/src')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java51
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java115
2 files changed, 141 insertions, 25 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
index e9f36b2d..ce671a90 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -48,7 +48,7 @@ import org.springframework.stereotype.Service;
/**
* Thread that pulls messages from DMaaP and save them to Big Data DBs
- *
+ *
* @author Guobiao Mo
*
*/
@@ -65,11 +65,11 @@ public class PullThread implements Runnable {
@Autowired
private ApplicationConfiguration config;
-
+
private final Logger log = LoggerFactory.getLogger(this.getClass());
private KafkaConsumer<String, String> consumer; //<String, String> is key-value type, in our case key is empty, value is JSON text
- private int id;
+ private int id;
private final AtomicBoolean active = new AtomicBoolean(false);
private boolean async;
@@ -112,33 +112,34 @@ public class PullThread implements Runnable {
List<String> topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop
log.info("Thread {} going to subscribe to topics: {}", id, topics);
-
+
consumer.subscribe(topics, rebalanceListener);
while (active.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
-
- List<Pair<Long, String>> messages = new ArrayList<>(records.count());
- for (TopicPartition partition : records.partitions()) {
- messages.clear();
- List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- for (ConsumerRecord<String, String> record : partitionRecords) {
- messages.add(Pair.of(record.timestamp(), record.value()));
- //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
- }
- storeService.saveMessages(partition.topic(), messages);
- log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
-
- if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
- long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
- consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
- }
- }
-
- if (async) {//for high Throughput, async commit offset in batch to Kafka
- consumer.commitAsync();
- }
+ if (records != null) {
+ List<Pair<Long, String>> messages = new ArrayList<>(records.count());
+ for (TopicPartition partition : records.partitions()) {
+ messages.clear();
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> record : partitionRecords) {
+ messages.add(Pair.of(record.timestamp(), record.value()));
+ //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+ }
+ storeService.saveMessages(partition.topic(), messages);
+ log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
+
+ if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ }
+
+ if (async) {//for high Throughput, async commit offset in batch to Kafka
+ consumer.commitAsync();
+ }
+ }
}
} catch (Exception e) {
log.error("Puller {} run(): exception={}", id, e.getMessage());
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java
new file mode 100644
index 00000000..713d8b19
--- /dev/null
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java
@@ -0,0 +1,115 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : DATALAKE
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.datalake.feeder.controller;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.service.DmaapService;
+import org.onap.datalake.feeder.service.PullService;
+import org.onap.datalake.feeder.service.PullThread;
+import org.springframework.context.ApplicationContext;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+
+public class FeederControllerTest {
+
+ @InjectMocks
+ private PullService pullService1;
+
+ @Mock
+ private ApplicationConfiguration config;
+
+ @Mock
+ private ApplicationContext context;
+
+ @Mock
+ private DmaapService dmaapService1;
+
+ @Mock
+ private KafkaConsumer<String, String> kafkaConsumer;
+
+ @Before
+ public void setupTest() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ private void setAccessPrivateFields(FeederController feederController) throws NoSuchFieldException,
+ IllegalAccessException {
+ Field pullService = feederController.getClass().getDeclaredField("pullService");
+ pullService.setAccessible(true);
+ pullService.set(feederController, pullService1);
+ }
+
+ @Test
+ public void testStart() throws IOException, NoSuchFieldException, IllegalAccessException {
+ FeederController feederController = new FeederController();
+ setAccessPrivateFields(feederController);
+ PullService pullService2 = new PullService();
+ Field applicationConfig = pullService2.getClass().getDeclaredField("config");
+ applicationConfig.setAccessible(true);
+ applicationConfig.set(pullService2, config);
+ Field applicationContext = pullService2.getClass().getDeclaredField("context");
+ applicationContext.setAccessible(true);
+ applicationContext.set(pullService2, context);
+ when(config.getKafkaConsumerCount()).thenReturn(1);
+ PullThread pullThread = new PullThread(1);
+ Field dmaapService = pullThread.getClass().getDeclaredField("dmaapService");
+ dmaapService.setAccessible(true);
+ dmaapService.set(pullThread, dmaapService1);
+ Field kafkaConsumer1 = pullThread.getClass().getDeclaredField("consumer");
+ kafkaConsumer1.setAccessible(true);
+ kafkaConsumer1.set(pullThread, kafkaConsumer);
+ applicationConfig = pullThread.getClass().getDeclaredField("config");
+ applicationConfig.setAccessible(true);
+ applicationConfig.set(pullThread, config);
+ when(context.getBean(PullThread.class, 0)).thenReturn(pullThread);
+ ConsumerRecords<String, String> records = ConsumerRecords.empty();
+ when(kafkaConsumer.poll(2)).thenReturn(records);
+ String start = feederController.start();
+ assertEquals("DataLake feeder is running.", start);
+ }
+
+ @Test
+ public void testStop() throws NoSuchFieldException, IllegalAccessException {
+ FeederController feederController = new FeederController();
+ setAccessPrivateFields(feederController);
+ String stop = feederController.stop();
+ assertEquals("DataLake feeder is stopped.", stop);
+ }
+
+ @Test
+ public void testStatus() throws NoSuchFieldException, IllegalAccessException {
+ FeederController feederController = new FeederController();
+ setAccessPrivateFields(feederController);
+ String status = feederController.status();
+ assertEquals("Feeder is running: false", status);
+ }
+}