summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org
diff options
context:
space:
mode:
authorKate Hsuan <kate.hsuan@qct.io>2019-05-21 10:31:03 +0800
committerKate Hsuan <kate.hsuan@qct.io>2019-05-22 10:24:50 +0800
commit123de46e9fdf6d54be08bd4339ce1f347180e329 (patch)
tree70fac8659d1c45b608a21878694835bfb7acbcbd /components/datalake-handler/feeder/src/main/java/org
parent2845d202c5689adf3b049ec32d1e15b8b6966f58 (diff)
Bug fix
1. Fix feeder start/stop exceptions- "The Kafka consumer is NOT thread-safe" exception. 2. Modify REST format for feeder start/stop/status. Issue-ID: DCAEGEN2-1437 Change-Id: I7f79243a0098e6fa17b06866ef6dfc3f71b20dc8 Signed-off-by: Kate Hsuan <kate.hsuan@qct.io>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java3
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java38
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java1
3 files changed, 27 insertions, 15 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 9106185e..2a72a76f 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -67,4 +67,7 @@ public class ApplicationConfiguration {
private int hdfsBufferSize;
private long hdfsFlushInterval;
private int hdfsBatchSize;
+
+ //Version
+ private String DatalakeVersion;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
index 3d296d5f..4fc9b7b6 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
@@ -21,14 +21,13 @@ package org.onap.datalake.feeder.controller;
import java.io.IOException;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.service.PullService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
import io.swagger.annotations.ApiOperation;
@@ -40,35 +39,45 @@ import io.swagger.annotations.ApiOperation;
*/
@RestController
-@RequestMapping(value = "/feeder", produces = { MediaType.TEXT_PLAIN_VALUE })
+@RequestMapping(value = "/feeder", produces = { MediaType.APPLICATION_JSON_VALUE })
public class FeederController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
private PullService pullService;
+
+ @Autowired
+ ApplicationConfiguration config;
/**
* @return message that application is started
* @throws IOException
*/
- @GetMapping("/start")
+ @PostMapping("/start")
+ @ResponseBody
@ApiOperation(value="Start pulling data.")
public String start() throws IOException {
log.info("DataLake feeder starting to pull data from DMaaP...");
- pullService.start();
- return "DataLake feeder is running.";
+ if(pullService.isRunning() == false) {
+ pullService.start();
+ }
+ return "{\"running\": true}";
}
/**
* @return message that application stop process is triggered
*/
- @GetMapping("/stop")
+ @PostMapping("/stop")
+ @ResponseBody
@ApiOperation(value="Stop pulling data.")
- public String stop() {
- pullService.shutdown();
+ public String stop() {
+ if(pullService.isRunning() == true)
+ {
+ pullService.shutdown();
+ }
log.info("DataLake feeder is stopped.");
- return "DataLake feeder is stopped.";
+ return "{\"running\": false}";
}
/**
* @return feeder status
@@ -77,7 +86,8 @@ public class FeederController {
@ApiOperation(value="Retrieve feeder status.")
public String status() {
String status = "Feeder is running: "+pullService.isRunning();
- log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
- return status;
- }
+ log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
+
+ return "{\"version\": \""+config.getDatalakeVersion()+"\", \"running\": "+pullService.isRunning()+"}";
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
index 1154b3a9..3a07e2f9 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -155,7 +155,6 @@ public class PullThread implements Runnable {
public void shutdown() {
active.set(false);
consumer.wakeup();
- consumer.unsubscribe();
}
private class DummyRebalanceListener implements ConsumerRebalanceListener {