summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java')
-rw-r--r--components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java10
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java23
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java136
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignController.java168
-rwxr-xr-xcomponents/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignTypeController.java54
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java12
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java149
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java52
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java58
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java92
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Design.java104
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DesignType.java73
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java64
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java147
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java134
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/TopicName.java86
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/DbConfig.java)5
-rwxr-xr-xcomponents/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignConfig.java48
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignTypeConfig.java39
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java64
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java87
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java54
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java38
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbTypeRepository.java35
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignRepository.java36
-rwxr-xr-xcomponents/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignTypeRepository.java35
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java35
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java35
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java53
-rwxr-xr-xcomponents/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignService.java272
-rwxr-xr-xcomponents/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignTypeService.java62
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java50
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java87
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java63
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java38
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java90
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java123
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java119
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java)52
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java39
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java)73
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java)85
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java)49
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java27
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/HttpClientUtil.java122
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java1
48 files changed, 2729 insertions, 555 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java b/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java
index e7a8e1b9..cde4d43d 100644
--- a/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java
+++ b/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java
@@ -42,15 +42,7 @@ public class CollectibleDocumentFieldNameValidator implements FieldNameValidator
throw new IllegalArgumentException("Field name can not be null");
}
- /* dl change
- if (fieldName.contains(".")) {
- return false;
- }*/
-
- if (fieldName.startsWith("$") && !EXCEPTIONS.contains(fieldName)) {
- return false;
- }
- return true;
+ return !fieldName.startsWith("$") || EXCEPTIONS.contains(fieldName);
}
@Override
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 73067182..b93924c4 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -20,6 +20,8 @@
package org.onap.datalake.feeder.config;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -41,6 +43,8 @@ import lombok.Setter;
@EnableAutoConfiguration
public class ApplicationConfiguration {
+ final ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
+
//App general
private boolean async;
private boolean enableSSL;
@@ -50,16 +54,7 @@ public class ApplicationConfiguration {
private String defaultTopicName;
- //DMaaP
- private String dmaapZookeeperHostPort;
- private String dmaapKafkaHostPort;
- private String dmaapKafkaGroup;
- private long dmaapKafkaTimeout;
- private String[] dmaapKafkaExclude;
-
- private int dmaapCheckNewTopicInterval; //in millisecond
-
- private int kafkaConsumerCount;
+ private long checkTopicInterval; //in millisecond
private String elasticsearchType;
@@ -70,4 +65,12 @@ public class ApplicationConfiguration {
//Version
private String datalakeVersion;
+
+ //Kibana
+ private String kibanaDashboardImportApi;
+ private Integer kibanaPort;
+
+ //Elasticsearch
+ private String esTemplateMappingApi;
+ private Integer esPort;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
index 7e364332..cff29596 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
@@ -24,14 +24,13 @@ import java.util.*;
import javax.servlet.http.HttpServletResponse;
-import io.swagger.annotations.*;
import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DesignType;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.DbRepository;
-import org.onap.datalake.feeder.repository.TopicRepository;
-import org.onap.datalake.feeder.service.DbService;
-import org.onap.datalake.feeder.controller.domain.DbConfig;
+import org.onap.datalake.feeder.dto.DbConfig;
import org.onap.datalake.feeder.controller.domain.PostReturnBody;
+import org.onap.datalake.feeder.repository.DesignTypeRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -39,15 +38,12 @@ import org.springframework.http.MediaType;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.*;
-import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
/**
* This controller manages the big data storage settings. All the settings are
* saved in database.
- *
+ *
* @author Guobiao Mo
*
*/
@@ -64,16 +60,12 @@ public class DbController {
private DbRepository dbRepository;
@Autowired
- private TopicRepository topicRepository;
-
- @Autowired
- private DbService dbService;
+ private DesignTypeRepository designTypeRepository;
- //list all dbs
+ //list all dbs
@GetMapping("")
@ResponseBody
@ApiOperation(value="Gat all databases name")
- //public Iterable<Db> list() throws IOException {
public List<String> list() throws IOException {
Iterable<Db> ret = dbRepository.findAll();
List<String> retString = new ArrayList<>();
@@ -86,6 +78,21 @@ public class DbController {
return retString;
}
+ @GetMapping("/idAndName/{id}")
+ @ResponseBody
+ @ApiOperation(value="Get all databases id and name by designTypeId")
+ public Map<Integer, String> listIdAndName(@PathVariable String id) {
+ Optional<DesignType> designType = designTypeRepository.findById(id);
+ Map<Integer, String> map = new HashMap<>();
+ if (designType.isPresent()) {
+ Set<Db> dbs = designType.get().getDbType().getDbs();
+ for (Db item : dbs) {
+ map.put(item.getId(), item.getName());
+ }
+ }
+ return map;
+ }
+
//Create a DB
@PostMapping("")
@ResponseBody
@@ -96,11 +103,11 @@ public class DbController {
return null;
}
- Db oldDb = dbService.getDb(dbConfig.getName());
+/* Db oldDb = dbService.getDb(dbConfig.getName());
if (oldDb != null) {
sendError(response, 400, "Db already exists: " + dbConfig.getName());
return null;
- } else {
+ } else {*/
Db newdb = new Db();
newdb.setName(dbConfig.getName());
newdb.setHost(dbConfig.getHost());
@@ -110,7 +117,7 @@ public class DbController {
newdb.setPass(dbConfig.getPassword());
newdb.setEncrypt(dbConfig.isEncrypt());
- if(!dbConfig.getName().equals("Elecsticsearch") || !dbConfig.getName().equals("Druid"))
+ if(!dbConfig.getName().equals("Elecsticsearch") || dbConfig.getName().equals("Druid"))
{
newdb.setDatabase(new String(dbConfig.getDatabase()));
}
@@ -122,7 +129,7 @@ public class DbController {
retBody.setReturnBody(retMsg);
retBody.setStatusCode(200);
return retBody;
- }
+ //}
}
//Show a db
@@ -132,10 +139,6 @@ public class DbController {
@ResponseBody
@ApiOperation(value="Get a database's details.")
public Db getDb(@PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
- /*Db db = dbService.getDb(dbName);
- if (db == null) {
- sendError(response, 404, "Db not found: " + dbName);
- }*/
Db db = dbRepository.findByName(dbName);
if (db == null) {
sendError(response, 404, "Db not found: " + dbName);
@@ -144,51 +147,6 @@ public class DbController {
}
- //Update Db
- @PutMapping("/{dbName}")
- @ResponseBody
- @ApiOperation(value="Update a database.")
- public PostReturnBody<DbConfig> updateDb(@PathVariable("dbName") String dbName, @RequestBody DbConfig dbConfig, BindingResult result, HttpServletResponse response) throws IOException {
-
- if (result.hasErrors()) {
- sendError(response, 400, "Error parsing DB: " + result.toString());
- return null;
- }
-
- if(!dbName.equals(dbConfig.getName()))
- {
- sendError(response, 400, "Mismatch DB name.");
- return null;
- }
-
- Db oldDb = dbService.getDb(dbConfig.getName());
- if (oldDb == null) {
- sendError(response, 404, "Db not found: " + dbConfig.getName());
- return null;
- } else {
- oldDb.setName(dbConfig.getName());
- oldDb.setHost(dbConfig.getHost());
- oldDb.setPort(dbConfig.getPort());
- oldDb.setEnabled(dbConfig.isEnabled());
- oldDb.setLogin(dbConfig.getLogin());
- oldDb.setPass(dbConfig.getPassword());
- oldDb.setEncrypt(dbConfig.isEncrypt());
-
- if(!oldDb.getName().equals("Elecsticsearch") || !oldDb.getName().equals("Druid"))
- {
- oldDb.setDatabase(dbConfig.getDatabase());
- }
- dbRepository.save(oldDb);
- DbConfig retMsg;
- PostReturnBody<DbConfig> retBody = new PostReturnBody<>();
- retMsg = new DbConfig();
- composeRetMessagefromDbConfig(oldDb, retMsg);
- retBody.setReturnBody(retMsg);
- retBody.setStatusCode(200);
- return retBody;
- }
- }
-
//Delete a db
//the topics are missing in the return, since in we use @JsonBackReference on Db's topics
//need to the the following method to retrieve the topic list
@@ -214,20 +172,56 @@ public class DbController {
@ResponseBody
@ApiOperation(value="Get a database's all topics.")
public Set<Topic> getDbTopics(@PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
- //Db db = dbService.getDb(dbName);
Set<Topic> topics;
try {
Db db = dbRepository.findByName(dbName);
topics = db.getTopics();
- }catch(Exception ex)
- {
+ } catch(Exception ex) {
sendError(response, 404, "DB: " + dbName + " or Topics not found");
- return null;
+ return Collections.emptySet();
}
return topics;
}
+ //Update Db
+ @PutMapping("")
+ @ResponseBody
+ @ApiOperation(value="Update a database.")
+ public PostReturnBody<DbConfig> updateDb(@RequestBody DbConfig dbConfig, BindingResult result, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing DB: " + result.toString());
+ return null;
+ }
+
+ Db oldDb = dbRepository.findById(dbConfig.getId()).get();
+ if (oldDb == null) {
+ sendError(response, 404, "Db not found: " + dbConfig.getName());
+ return null;
+ } else {
+ oldDb.setHost(dbConfig.getHost());
+ oldDb.setPort(dbConfig.getPort());
+ oldDb.setEnabled(dbConfig.isEnabled());
+ oldDb.setLogin(dbConfig.getLogin());
+ oldDb.setPass(dbConfig.getPassword());
+ oldDb.setEncrypt(dbConfig.isEncrypt());
+ if (!oldDb.getName().equals("Elecsticsearch") || !oldDb.getName().equals("Druid")) {
+ oldDb.setDatabase(dbConfig.getDatabase());
+ }
+
+ dbRepository.save(oldDb);
+ DbConfig retMsg;
+ PostReturnBody<DbConfig> retBody = new PostReturnBody<>();
+ retMsg = new DbConfig();
+ composeRetMessagefromDbConfig(oldDb, retMsg);
+ retBody.setReturnBody(retMsg);
+ retBody.setStatusCode(200);
+ return retBody;
+ }
+
+ }
+
@PostMapping("/verify")
@ResponseBody
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignController.java
new file mode 100644
index 00000000..ebe60502
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignController.java
@@ -0,0 +1,168 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.controller;
+
+import org.onap.datalake.feeder.controller.domain.PostReturnBody;
+import org.onap.datalake.feeder.domain.Design;
+import org.onap.datalake.feeder.dto.DesignConfig;
+import org.onap.datalake.feeder.repository.DesignRepository;
+import org.onap.datalake.feeder.service.DesignService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.*;
+
+import io.swagger.annotations.ApiOperation;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import javax.servlet.http.HttpServletResponse;
+
+
+/**
+ * This controller manages design settings
+ *
+ * @author guochunmeng
+ */
+@RestController
+@RequestMapping(value = "/designs", produces = MediaType.APPLICATION_JSON_VALUE)
+public class DesignController {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private DesignRepository designRepository;
+
+ @Autowired
+ private DesignService designService;
+
+ @PostMapping("")
+ @ResponseBody
+ @ApiOperation(value="Create a design.")
+ public PostReturnBody<DesignConfig> createDesign(@RequestBody DesignConfig designConfig, BindingResult result, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing DesignConfig: "+result.toString());
+ return null;
+ }
+
+ Design design = null;
+ try {
+ design = designService.fillDesignConfiguration(designConfig);
+ } catch (Exception e) {
+ log.debug("FillDesignConfiguration failed", e.getMessage());
+ sendError(response, 400, "Error FillDesignConfiguration: "+e.getMessage());
+ return null;
+ }
+ designRepository.save(design);
+ log.info("Design save successed");
+ return mkPostReturnBody(200, design);
+ }
+
+
+ @PutMapping("{id}")
+ @ResponseBody
+ @ApiOperation(value="Update a design.")
+ public PostReturnBody<DesignConfig> updateDesign(@RequestBody DesignConfig designConfig, BindingResult result, @PathVariable Integer id, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing DesignConfig: "+result.toString());
+ return null;
+ }
+
+ Design design = designService.getDesign(id);
+ if (design != null) {
+ try {
+ designService.fillDesignConfiguration(designConfig, design);
+ } catch (Exception e) {
+ log.debug("FillDesignConfiguration failed", e.getMessage());
+ sendError(response, 400, "Error FillDesignConfiguration: "+e.getMessage());
+ return null;
+ }
+ designRepository.save(design);
+ log.info("Design update successed");
+ return mkPostReturnBody(200, design);
+ } else {
+ sendError(response, 400, "Design not found: "+id);
+ return null;
+ }
+
+ }
+
+
+ @DeleteMapping("/{id}")
+ @ResponseBody
+ @ApiOperation(value="delete a design.")
+ public void deleteDesign(@PathVariable("id") Integer id, HttpServletResponse response) throws IOException{
+
+ Design oldDesign = designService.getDesign(id);
+ if (oldDesign == null) {
+ sendError(response, 400, "design not found "+id);
+ } else {
+ designRepository.delete(oldDesign);
+ response.setStatus(204);
+ }
+ }
+
+
+ @GetMapping("")
+ @ResponseBody
+ @ApiOperation(value="List all Designs")
+ public List<DesignConfig> queryAllDesign(){
+ return designService.queryAllDesign();
+ }
+
+
+ @PostMapping("/deploy/{id}")
+ @ResponseBody
+ @ApiOperation(value="Design deploy")
+ public Map<Integer, Boolean> deployDesign(@PathVariable Integer id, HttpServletResponse response) throws IOException {
+
+ Optional<Design> designOptional = designRepository.findById(id);
+ if (designOptional.isPresent()) {
+ Design design = designOptional.get();
+ return designService.deploy(design);
+ } else {
+ sendError(response, 400, "Design is null");
+ return new HashMap<>();
+ }
+ }
+
+
+ private PostReturnBody<DesignConfig> mkPostReturnBody(int statusCode, Design design) {
+ PostReturnBody<DesignConfig> retBody = new PostReturnBody<>();
+ retBody.setStatusCode(statusCode);
+ retBody.setReturnBody(design.getDesignConfig());
+ return retBody;
+ }
+
+ private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+ log.info(msg);
+ response.sendError(sc, msg);
+ }
+
+} \ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignTypeController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignTypeController.java
new file mode 100755
index 00000000..35d206bb
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignTypeController.java
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.controller;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onap.datalake.feeder.domain.DesignType;
+import org.onap.datalake.feeder.dto.DesignTypeConfig;
+import org.onap.datalake.feeder.service.DesignTypeService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.*;
+
+import io.swagger.annotations.ApiOperation;
+
+/**
+ * This controller manages designType settings
+ *
+ * @author guochunmeng
+ */
+@RestController
+@RequestMapping(value = "/designTypes", produces = { MediaType.APPLICATION_JSON_VALUE })
+public class DesignTypeController {
+
+ @Autowired
+ private DesignTypeService designTypeService;
+
+ @GetMapping("")
+ @ResponseBody
+ @ApiOperation(value="List all designTypes")
+ public List<DesignTypeConfig> getDesignType() {
+ return designTypeService.getDesignTypes();
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
index 6a44c4f2..d9080ec0 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
@@ -58,9 +58,12 @@ public class FeederController {
@ResponseBody
@ApiOperation(value="Start pulling data.")
public String start() throws IOException {
- log.info("DataLake feeder starting to pull data from DMaaP...");
+ log.info("Going to start DataLake feeder ...");
if(pullService.isRunning() == false) {
pullService.start();
+ log.info("DataLake feeder started.");
+ }else {
+ log.info("DataLake feeder already started.");
}
return "{\"running\": true}";
}
@@ -72,11 +75,14 @@ public class FeederController {
@ResponseBody
@ApiOperation(value="Stop pulling data.")
public String stop() {
+ log.info("Going to stop DataLake feeder ...");
if(pullService.isRunning() == true)
{
pullService.shutdown();
+ log.info("DataLake feeder is stopped.");
+ }else {
+ log.info("DataLake feeder already stopped.");
}
- log.info("DataLake feeder is stopped.");
return "{\"running\": false}";
}
/**
@@ -86,7 +92,7 @@ public class FeederController {
@ApiOperation(value="Retrieve feeder status.")
public String status() {
String status = "Feeder is running: "+pullService.isRunning();
- log.info("sending feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
+ log.info("sending feeder status ..." + status);//TODO we can send what topics are monitored, how many messages are sent, etc.
return "{\"version\": \""+config.getDatalakeVersion()+"\", \"running\": "+pullService.isRunning()+"}";
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java
new file mode 100644
index 00000000..8d1bf316
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java
@@ -0,0 +1,149 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.controller;
+
+import io.swagger.annotations.ApiOperation;
+import org.onap.datalake.feeder.controller.domain.PostReturnBody;
+import org.onap.datalake.feeder.domain.Kafka;
+import org.onap.datalake.feeder.dto.KafkaConfig;
+import org.onap.datalake.feeder.repository.KafkaRepository;
+import org.onap.datalake.feeder.service.KafkaService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.*;
+
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This controller manages kafka settings
+ *
+ * @author guochunmeng
+ */
+@RestController
+@RequestMapping(value = "/kafkas", produces = { MediaType.APPLICATION_JSON_VALUE })
+public class KafkaController {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private KafkaService kafkaService;
+
+ @Autowired
+ private KafkaRepository kafkaRepository;
+
+ @PostMapping("")
+ @ResponseBody
+ @ApiOperation(value="Create a kafka.")
+ public PostReturnBody<KafkaConfig> createKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
+ return null;
+ }
+
+ Kafka oldKafka = kafkaService.getKafkaById(kafkaConfig.getId());
+
+ if (oldKafka != null) {
+ sendError(response, 400, "kafka is exist "+kafkaConfig.getId());
+ return null;
+ } else {
+ Kafka kafka = null;
+ try {
+ kafka = kafkaService.fillKafkaConfiguration(kafkaConfig);
+ } catch (Exception e) {
+ log.debug("FillKafkaConfiguration failed", e.getMessage());
+ sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage());
+ return null;
+ }
+ kafkaRepository.save(kafka);
+ log.info("Kafka save successed");
+ return mkPostReturnBody(200, kafka);
+ }
+ }
+
+ @PutMapping("/{id}")
+ @ResponseBody
+ @ApiOperation(value="Update a kafka.")
+ public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable int id, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
+ return null;
+ }
+
+ Kafka oldKafka = kafkaService.getKafkaById(id);
+
+ if (oldKafka == null) {
+ sendError(response, 400, "Kafka not found: "+id);
+ return null;
+ } else {
+ try {
+ kafkaService.fillKafkaConfiguration(kafkaConfig, oldKafka);
+ } catch (Exception e) {
+ log.debug("FillKafkaConfiguration failed", e.getMessage());
+ sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage());
+ return null;
+ }
+ kafkaRepository.save(oldKafka);
+ log.info("kafka update successed");
+ return mkPostReturnBody(200, oldKafka);
+ }
+ }
+
+ @DeleteMapping("/{id}")
+ @ResponseBody
+ @ApiOperation(value="delete a kafka.")
+ public void deleteKafka(@PathVariable("id") int id, HttpServletResponse response) throws IOException{
+
+ Kafka oldKafka = kafkaService.getKafkaById(id);
+ if (oldKafka == null) {
+ sendError(response, 400, "kafka not found "+id);
+ } else {
+ kafkaRepository.delete(oldKafka);
+ response.setStatus(204);
+ }
+ }
+
+ @GetMapping("")
+ @ResponseBody
+ @ApiOperation(value="List all Kafkas")
+ public List<KafkaConfig> queryAllKafka(){
+ return kafkaService.getAllKafka();
+ }
+
+ private PostReturnBody<KafkaConfig> mkPostReturnBody(int statusCode, Kafka kafka) {
+ PostReturnBody<KafkaConfig> retBody = new PostReturnBody<>();
+ retBody.setStatusCode(statusCode);
+ retBody.setReturnBody(kafka.getKafkaConfig());
+ return retBody;
+ }
+
+ private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+ log.info(msg);
+ response.sendError(sc, msg);
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index 88f573a1..b59b2a7b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -27,17 +27,18 @@ import java.util.Set;
import javax.servlet.http.HttpServletResponse;
import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Kafka;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.controller.domain.PostReturnBody;
import org.onap.datalake.feeder.dto.TopicConfig;
-import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.repository.KafkaRepository;
import org.onap.datalake.feeder.repository.TopicRepository;
-import org.onap.datalake.feeder.service.DbService;
import org.onap.datalake.feeder.service.DmaapService;
import org.onap.datalake.feeder.service.TopicService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.http.MediaType;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.DeleteMapping;
@@ -50,6 +51,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
+
import io.swagger.annotations.ApiOperation;
/**
@@ -71,18 +73,23 @@ public class TopicController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
- private DmaapService dmaapService;
+ private ApplicationContext context;
@Autowired
+ private KafkaRepository kafkaRepository;
+
+ @Autowired
private TopicRepository topicRepository;
@Autowired
private TopicService topicService;
- @GetMapping("/dmaap")
+ @GetMapping("/dmaap/{kafkaId}")
@ResponseBody
@ApiOperation(value = "List all topic names in DMaaP.")
- public List<String> listDmaapTopics() {
+ public List<String> listDmaapTopics(@PathVariable("kafkaId") int kafkaId ) {
+ Kafka kafka = kafkaRepository.findById(kafkaId).get();
+ DmaapService dmaapService = context.getBean(DmaapService.class, kafka);
return dmaapService.getTopics();
}
@@ -94,7 +101,7 @@ public class TopicController {
List<String> retString = new ArrayList<>();
for(Topic item : ret)
{
- if(!topicService.istDefaultTopic(item))
+ if(!topicService.isDefaultTopic(item))
retString.add(item.getName());
}
return retString;
@@ -109,24 +116,25 @@ public class TopicController {
sendError(response, 400, "Error parsing Topic: "+result.toString());
return null;
}
- Topic oldTopic = topicService.getTopic(topicConfig.getName());
+ /*Topic oldTopic = topicService.getTopic(topicConfig.getName());
if (oldTopic != null) {
sendError(response, 400, "Topic already exists "+topicConfig.getName());
return null;
- } else {
+ } else {*/
Topic wTopic = topicService.fillTopicConfiguration(topicConfig);
if(wTopic.getTtl() == 0)
wTopic.setTtl(3650);
topicRepository.save(wTopic);
return mkPostReturnBody(200, wTopic);
- }
+ //}
+ //FIXME need to connect to Kafka
}
- @GetMapping("/{topicName}")
+ @GetMapping("/{topicId}")
@ResponseBody
@ApiOperation(value="Get a topic's settings.")
- public TopicConfig getTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException {
- Topic topic = topicService.getTopic(topicName);
+ public TopicConfig getTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException {
+ Topic topic = topicService.getTopic(topicId);
if(topic == null) {
sendError(response, 404, "Topic not found");
return null;
@@ -136,23 +144,23 @@ public class TopicController {
//This is not a partial update: old topic is wiped out, and new topic is created based on the input json.
//One exception is that old DBs are kept
- @PutMapping("/{topicName}")
+ @PutMapping("/{topicId}")
@ResponseBody
@ApiOperation(value="Update a topic.")
- public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicName") String topicName, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
+ public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicId") int topicId, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
if (result.hasErrors()) {
sendError(response, 400, "Error parsing Topic: "+result.toString());
return null;
}
- if(!topicName.equals(topicConfig.getName()))
+ if(topicId!=topicConfig.getId())
{
- sendError(response, 400, "Topic name mismatch" + topicName + topicConfig.getName());
+ sendError(response, 400, "Topic name mismatch" + topicId + topicConfig);
return null;
}
- Topic oldTopic = topicService.getTopic(topicConfig.getName());
+ Topic oldTopic = topicService.getTopic(topicId);
if (oldTopic == null) {
sendError(response, 404, "Topic not found "+topicConfig.getName());
return null;
@@ -163,14 +171,14 @@ public class TopicController {
}
}
- @DeleteMapping("/{topicName}")
+ @DeleteMapping("/{topicId}")
@ResponseBody
- @ApiOperation(value="Update a topic.")
- public void deleteTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException
+ @ApiOperation(value="Delete a topic.")
+ public void deleteTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException
{
- Topic oldTopic = topicService.getTopic(topicName);
+ Topic oldTopic = topicService.getTopic(topicId);
if (oldTopic == null) {
- sendError(response, 404, "Topic not found "+topicName);
+ sendError(response, 404, "Topic not found "+topicId);
} else {
Set<Db> dbRelation = oldTopic.getDbs();
dbRelation.clear();
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
index da1f6cab..cfd2462b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
@@ -24,11 +24,17 @@ import java.util.Set;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.JoinTable;
import javax.persistence.ManyToMany;
+import javax.persistence.ManyToOne;
import javax.persistence.Table;
+
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+
import com.fasterxml.jackson.annotation.JsonBackReference;
import lombok.Getter;
import lombok.Setter;
@@ -46,10 +52,14 @@ import lombok.Setter;
@Table(name = "db")
public class Db {
@Id
+ @GeneratedValue(strategy = GenerationType.IDENTITY)
+ @Column(name = "`id`")
+ private int id;
+
@Column(name="`name`")
private String name;
- @Column(name="`enabled`")
+ @Column(name="`enabled`", nullable = false)
private boolean enabled;
@Column(name="`host`")
@@ -79,19 +89,49 @@ public class Db {
@Column(name="`property3`")
private String property3;
+ @ManyToOne(fetch = FetchType.EAGER)
+ @JoinColumn(name = "db_type_id", nullable = false)
+ private DbType dbType;
+
@JsonBackReference
@ManyToMany(fetch = FetchType.EAGER)
@JoinTable( name = "map_db_topic",
- joinColumns = { @JoinColumn(name="db_name") },
- inverseJoinColumns = { @JoinColumn(name="topic_name") }
+ joinColumns = { @JoinColumn(name="db_id") },
+ inverseJoinColumns = { @JoinColumn(name="topic_id") }
)
- protected Set<Topic> topics;
+ private Set<Topic> topics;
+
+ public boolean isTool() {
+ return dbType.isTool();
+ }
+
+ public boolean isHdfs() {
+ return isDb(DbTypeEnum.HDFS);
+ }
+
+ public boolean isElasticsearch() {
+ return isDb(DbTypeEnum.ES);
+ }
+
+ public boolean isCouchbase() {
+ return isDb(DbTypeEnum.CB);
+ }
- public Db() {
+ public boolean isDruid() {
+ return isDb(DbTypeEnum.DRUID);
}
- public Db(String name) {
- this.name = name;
+ public boolean isMongoDB() {
+ return isDb(DbTypeEnum.MONGO);
+ }
+
+ private boolean isDb(DbTypeEnum dbTypeEnum) {
+ return dbTypeEnum.equals(DbTypeEnum.valueOf(dbType.getId()));
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Db %s (name=%s, enabled=%s)", id, name, enabled);
}
@Override
@@ -102,11 +142,11 @@ public class Db {
if (this.getClass() != obj.getClass())
return false;
- return name.equals(((Db) obj).getName());
+ return id==((Db) obj).getId();
}
@Override
public int hashCode() {
- return name.hashCode();
+ return id;
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java
new file mode 100644
index 00000000..9c83a9cd
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java
@@ -0,0 +1,92 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.OneToMany;
+import javax.persistence.Table;
+import lombok.Getter;
+import lombok.Setter;
+
+
+/**
+ * Domain class representing bid data storage type
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Setter
+@Getter
+@Entity
+@Table(name = "db_type")
+public class DbType {
+ @Id
+ @Column(name="`id`")
+ private String id;
+
+ @Column(name="`name`", nullable = false)
+ private String name;
+
+ @Column(name="`default_port`")
+ private Integer defaultPort;
+
+ @Column(name="`tool`", nullable = false)
+ private boolean tool;
+
+ @OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY, mappedBy = "dbType")
+ protected Set<Db> dbs = new HashSet<>();
+
+ public DbType() {
+ }
+
+ public DbType(String id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DbType %s (name=%s)", id, name);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (this.getClass() != obj.getClass())
+ return false;
+
+ return id.equals(((DbType) obj).getId());
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Design.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Design.java
new file mode 100644
index 00000000..faf3755e
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Design.java
@@ -0,0 +1,104 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.domain;
+
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import javax.persistence.*;
+
+import org.onap.datalake.feeder.dto.DesignConfig;
+
+/**
+ * Domain class representing design
+ *
+ * @author guochunmeng
+ */
+
+@Getter
+@Setter
+@Entity
+@Table(name = "design")
+public class Design {
+
+ @Id
+ @GeneratedValue(strategy = GenerationType.IDENTITY)
+ @Column(name = "`id`")
+ private Integer id;
+
+ @Column(name = "`name`")
+ private String name;
+
+ @ManyToOne(fetch = FetchType.EAGER)
+ @JoinColumn(name = "topic_name_id", nullable = false)
+ private TopicName topicName;//topic name
+
+ @Column(name = "`submitted`")
+ private Boolean submitted;
+
+ @Column(name = "`body`")
+ private String body;
+
+ @Column(name = "`note`")
+ private String note;
+
+ @ManyToOne(fetch=FetchType.EAGER)
+ @JoinColumn(name = "design_type_id", nullable = false)
+ @JsonBackReference
+ private DesignType designType;
+
+ //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
+ @JsonBackReference
+ //@JsonManagedReference
+ @ManyToMany(fetch = FetchType.EAGER)
+ @JoinTable(name = "map_db_design", joinColumns = { @JoinColumn(name = "design_id") }, inverseJoinColumns = { @JoinColumn(name = "db_id") })
+ protected Set<Db> dbs;
+
+ public DesignConfig getDesignConfig() {
+
+ DesignConfig designConfig = new DesignConfig();
+
+ designConfig.setId(getId());
+ designConfig.setBody(getBody());
+ designConfig.setName(getName());
+ designConfig.setNote(getNote());
+ designConfig.setSubmitted(getSubmitted());
+ designConfig.setTopicName(getTopicName().getId());
+ designConfig.setDesignType(getDesignType().getId());
+ designConfig.setDesignTypeName(getDesignType().getName());
+
+ Set<Db> designDb = getDbs();
+ List<Integer> dbList = new ArrayList<>();
+ if (designDb != null) {
+ for (Db item : designDb) {
+ dbList.add(item.getId());
+ }
+ }
+ designConfig.setDbs(dbList);
+
+ return designConfig;
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DesignType.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DesignType.java
new file mode 100644
index 00000000..14026fe0
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DesignType.java
@@ -0,0 +1,73 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.domain;
+
+
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.datalake.feeder.dto.DesignTypeConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.persistence.*;
+
+/**
+ * Domain class representing design_type
+ *
+ * @author guochunmeng
+ */
+@Getter
+@Setter
+@Entity
+@Table(name = "design_type")
+public class DesignType {
+
+ @Id
+ @Column(name = "`id`")
+ private String id;
+
+ @Column(name = "`name`")
+ private String name;
+
+ @ManyToOne(fetch=FetchType.LAZY)
+ @JoinColumn(name="db_type_id", nullable = false)
+ @JsonBackReference
+ private DbType dbType;
+
+ @OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY, mappedBy = "designType")
+ protected Set<Design> designs = new HashSet<>();
+
+ @Column(name = "`note`")
+ private String note;
+
+ public DesignTypeConfig getDesignTypeConfig() {
+
+ DesignTypeConfig designTypeConfig = new DesignTypeConfig();
+
+ designTypeConfig.setId(getId());
+ designTypeConfig.setName(getName());
+
+ return designTypeConfig;
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java
new file mode 100644
index 00000000..df7aad04
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java
@@ -0,0 +1,64 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+/**
+ * A warper of parent Topic
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+public class EffectiveTopic {
+ private Topic topic; //base Topic
+
+ String name;
+
+ public EffectiveTopic(Topic baseTopic) {
+ topic = baseTopic;
+ }
+
+ public EffectiveTopic(Topic baseTopic, String name ) {
+ topic = baseTopic;
+ this.name = name;
+ }
+
+ public String getName() {
+ return name==null?topic.getName():name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Topic getTopic() {
+ return topic;
+ }
+
+ public void setTopic(Topic topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("EffectiveTopic %s (base Topic=%s)", getName(), topic.toString());
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
new file mode 100644
index 00000000..de80db70
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
@@ -0,0 +1,147 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import java.util.Set;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.JoinTable;
+import javax.persistence.ManyToMany;
+import javax.persistence.Table;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.datalake.feeder.dto.KafkaConfig;
+
+
+/**
+ * Domain class representing Kafka cluster
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Setter
+@Getter
+@Entity
+@Table(name = "kafka")
+public class Kafka {
+ @Id
+ @GeneratedValue(strategy = GenerationType.IDENTITY)
+ @Column(name = "`id`")
+ private int id;
+
+ @Column(name="`name`", nullable = false)
+ private String name;
+
+ @Column(name="`enabled`", nullable = false)
+ private boolean enabled;
+
+ @Column(name="broker_list", nullable = false)
+ private String brokerList;//message-router-kafka:9092,message-router-kafka2:9092
+
+ @Column(name="`zk`", nullable = false)
+ private String zooKeeper;//message-router-zookeeper:2181
+
+ @Column(name="`group`", columnDefinition = "varchar(255) DEFAULT 'datalake'")
+ private String group;
+
+ @Column(name="`secure`", columnDefinition = " bit(1) DEFAULT 0")
+ private boolean secure;
+
+ @Column(name="`login`")
+ private String login;
+
+ @Column(name="`pass`")
+ private String pass;
+
+ @Column(name="`security_protocol`")
+ private String securityProtocol;
+
+ //by default, all topics started with '__' are excluded, here one can explicitly include them
+ //example: '__consumer_offsets,__transaction_state'
+ @Column(name="`included_topic`")
+ private String includedTopic;
+
+ @Column(name="`excluded_topic`", columnDefinition = "varchar(1023) default '__consumer_offsets,__transaction_state'")
+ private String excludedTopic;
+
+ @Column(name="`consumer_count`", columnDefinition = "integer default 3")
+ private Integer consumerCount;
+
+ //don't show this field in admin UI
+ @Column(name="`timeout_sec`", columnDefinition = "integer default 10")
+ private Integer timeout;
+
+ @JsonBackReference
+ @ManyToMany(fetch = FetchType.EAGER)
+ @JoinTable( name = "map_kafka_topic",
+ joinColumns = { @JoinColumn(name="kafka_id") },
+ inverseJoinColumns = { @JoinColumn(name="topic_id") }
+ )
+ private Set<Topic> topics;
+
+ @Override
+ public String toString() {
+ return String.format("Kafka %s (name=%s, enabled=%s)", id, name, enabled);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (this.getClass() != obj.getClass())
+ return false;
+
+ return id == ((Kafka) obj).getId();
+ }
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+
+ public KafkaConfig getKafkaConfig() {
+ KafkaConfig kafkaConfig = new KafkaConfig();
+
+ kafkaConfig.setId(getId());
+ kafkaConfig.setBrokerList(getBrokerList());
+ kafkaConfig.setConsumerCount(getConsumerCount());
+ kafkaConfig.setEnabled(isEnabled());
+ kafkaConfig.setExcludedTopic(getExcludedTopic());
+ kafkaConfig.setGroup(getGroup());
+ kafkaConfig.setIncludedTopic(getIncludedTopic());
+ kafkaConfig.setLogin(getLogin());
+ kafkaConfig.setName(getName());
+ kafkaConfig.setPass(getPass());
+ kafkaConfig.setSecure(isSecure());
+ kafkaConfig.setSecurityProtocol(getSecurityProtocol());
+ kafkaConfig.setTimeout(getTimeout());
+ kafkaConfig.setZooKeeper(getZooKeeper());
+
+ return kafkaConfig;
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index acb48aef..5d0c7625 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -20,6 +20,7 @@
package org.onap.datalake.feeder.domain;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -30,9 +31,13 @@ import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.JoinTable;
import javax.persistence.ManyToMany;
+import javax.persistence.ManyToOne;
import javax.persistence.Table;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONObject;
import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.enumeration.DataFormat;
import com.fasterxml.jackson.annotation.JsonBackReference;
@@ -51,9 +56,13 @@ import lombok.Setter;
@Table(name = "topic")
public class Topic {
@Id
- @Column(name = "`name`")
- private String name;//topic name
+ @Column(name = "`id`")
+ private Integer id;
+ @ManyToOne(fetch = FetchType.EAGER)
+ @JoinColumn(name = "topic_name_id", nullable = false)
+ private TopicName topicName;//topic name
+
//for protected Kafka topics
@Column(name = "`login`")
private String login;
@@ -65,65 +74,59 @@ public class Topic {
@JsonBackReference
//@JsonManagedReference
@ManyToMany(fetch = FetchType.EAGER)
- @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_name") }, inverseJoinColumns = { @JoinColumn(name = "db_name") })
- protected Set<Db> dbs;
+ @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "db_id") })
+ protected Set<Db> dbs=new HashSet<>();
+
+ @ManyToMany(fetch = FetchType.EAGER)
+ @JoinTable(name = "map_kafka_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "kafka_id") })
+ protected Set<Kafka> kafkas=new HashSet<>();
/**
* indicate if we should monitor this topic
*/
- @Column(name = "`enabled`")
- private Boolean enabled;
+ @Column(name = "`enabled`", nullable = false)
+ private boolean enabled;
/**
* save raw message text
*/
- @Column(name = "`save_raw`")
- private Boolean saveRaw;
+ @Column(name = "`save_raw`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
+ private boolean saveRaw;
/**
* need to explicitly tell feeder the data format of the message. support JSON,
* XML, YAML, TEXT
*/
@Column(name = "`data_format`")
- private String dataFormat;
+ protected String dataFormat;
/**
* TTL in day
*/
+ @Column(name = "`ttl_day`")
private Integer ttl;
//if this flag is true, need to correlate alarm cleared message to previous alarm
- @Column(name = "`correlate_cleared_message`")
- private Boolean correlateClearedMessage;
+ @Column(name = "`correlate_cleared_message`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
+ private boolean correlateClearedMessage;
//paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name"
@Column(name = "`message_id_path`")
- private String messageIdPath;
+ protected String messageIdPath;
//paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray"
- @Column(name = "`aggregate_array_path`")
- private String aggregateArrayPath;
+ @Column(name = "`aggregate_array_path`")
+ protected String aggregateArrayPath;
//paths to the element in array that need flatten, this element is used as label, comma separated,
//example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..."
- @Column(name = "`flatten_array_path`")
- private String flattenArrayPath;
+ @Column(name = "`flatten_array_path`")
+ protected String flattenArrayPath;
- public Topic() {
- }
-
- public Topic(String name) {
- this.name = name;
- }
-
- public boolean isEnabled() {
- return is(enabled);
+ public String getName() {
+ return topicName.getId();
}
-
- public boolean isCorrelateClearedMessage() {
- return is(correlateClearedMessage);
- }
-
+
public int getTtl() {
if (ttl != null) {
return ttl;
@@ -132,25 +135,58 @@ public class Topic {
}
}
- private boolean is(Boolean b) {
- return is(b, false);
+ public DataFormat getDataFormat2() {
+ if (dataFormat != null) {
+ return DataFormat.fromString(dataFormat);
+ } else {
+ return null;
+ }
}
- private boolean is(Boolean b, boolean defaultValue) {
- if (b != null) {
- return b;
- } else {
- return defaultValue;
+ public String[] getAggregateArrayPath2() {
+ String[] ret = null;
+
+ if (StringUtils.isNotBlank(aggregateArrayPath)) {
+ ret = aggregateArrayPath.split(",");
}
+
+ return ret;
+ }
+
+ public String[] getFlattenArrayPath2() {
+ String[] ret = null;
+
+ if (StringUtils.isNotBlank(flattenArrayPath)) {
+ ret = flattenArrayPath.split(",");
+ }
+
+ return ret;
}
- public boolean isSaveRaw() {
- return is(saveRaw);
+ //extract DB id from JSON attributes, support multiple attributes
+ public String getMessageId(JSONObject json) {
+ String ret = null;
+
+ if (StringUtils.isNotBlank(messageIdPath)) {
+ String[] paths = messageIdPath.split(",");
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < paths.length; i++) {
+ if (i > 0) {
+ sb.append('^');
+ }
+ sb.append(json.query(paths[i]).toString());
+ }
+ ret = sb.toString();
+ }
+
+ return ret;
}
public TopicConfig getTopicConfig() {
TopicConfig tConfig = new TopicConfig();
+ tConfig.setId(getId());
tConfig.setName(getName());
tConfig.setLogin(getLogin());
tConfig.setEnabled(isEnabled());
@@ -161,21 +197,35 @@ public class Topic {
tConfig.setAggregateArrayPath(getAggregateArrayPath());
tConfig.setFlattenArrayPath(getFlattenArrayPath());
tConfig.setTtl(getTtl());
+
Set<Db> topicDb = getDbs();
List<String> dbList = new ArrayList<>();
+ List<String> enabledDbList = new ArrayList<>();
if (topicDb != null) {
for (Db item : topicDb) {
dbList.add(item.getName());
+ if(item.isEnabled()) {
+ enabledDbList.add(item.getName());
+ }
}
}
tConfig.setSinkdbs(dbList);
+ tConfig.setEnabledSinkdbs(enabledDbList);
+ Set<Kafka> topicKafka = getKafkas();
+ List<Integer> kafkaList = new ArrayList<>();
+ if (topicKafka != null) {
+ for (Kafka kafka : topicKafka) {
+ kafkaList.add(kafka.getId());
+ }
+ }
+ tConfig.setKafkas(kafkaList);
return tConfig;
}
@Override
public String toString() {
- return name;
+ return String.format("Topic %s (enabled=%s, dbs=%s, kafkas=%s)", topicName, enabled, dbs, kafkas);
}
@Override
@@ -186,12 +236,12 @@ public class Topic {
if (this.getClass() != obj.getClass())
return false;
- return name.equals(((Topic) obj).getName());
+ return id.equals(((Topic) obj).getId());
}
@Override
public int hashCode() {
- return name.hashCode();
+ return id;
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/TopicName.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/TopicName.java
new file mode 100644
index 00000000..83227ada
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/TopicName.java
@@ -0,0 +1,86 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+
+import java.util.Set;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.OneToMany;
+import javax.persistence.Table;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Domain class representing unique topic names
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Setter
+@Getter
+@Entity
+@Table(name = "topic_name")
+public class TopicName {
+ @Id
+ @Column(name = "`id`")
+ private String id;//topic name
+
+
+ @OneToMany(fetch = FetchType.LAZY, mappedBy = "topicName")
+ protected Set<Design> designs;
+
+
+ @OneToMany(fetch = FetchType.LAZY, mappedBy = "topicName")
+ protected Set<Topic> topics;
+
+ public TopicName() {
+ }
+
+ public TopicName(String name) {
+ id = name;
+ }
+
+ @Override
+ public String toString() {
+ return "TopicName "+ id;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (this.getClass() != obj.getClass())
+ return false;
+
+ return id.equals(((TopicName) obj).getId());
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/DbConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java
index 557b545c..eff87114 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/DbConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.datalake.feeder.controller.domain;
+package org.onap.datalake.feeder.dto;
import lombok.Getter;
@@ -33,6 +33,7 @@ import lombok.Setter;
@Getter
@Setter
public class DbConfig {
+ private int id;
private String name;
private String host;
private boolean enabled;
@@ -40,6 +41,6 @@ public class DbConfig {
private String password;
private boolean encrypt;
private String database;
- private int port;
+ private Integer port;
private String poperties;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignConfig.java
new file mode 100755
index 00000000..34256004
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignConfig.java
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.dto;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.List;
+
+/**
+ * JSON request body for portalDesign Config.
+ *
+ * @author guochunmeng
+ */
+
+@Getter
+@Setter
+public class DesignConfig {
+
+ private Integer id;
+ private String name;
+ private Boolean submitted;
+ private String body;
+ private String note;
+ private String topicName;
+ private String designType;
+ private String designTypeName;//UI show name
+ private List<Integer> dbs;
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignTypeConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignTypeConfig.java
new file mode 100644
index 00000000..ddedf38b
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignTypeConfig.java
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 QCT
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.dto;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * JSON request body for DesignType Config.
+ *
+ * @author guochunmeng
+ *
+ */
+@Setter
+@Getter
+public class DesignTypeConfig {
+
+ private String id;
+ private String name;
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java
new file mode 100644
index 00000000..f5e9539c
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 QCT
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.dto;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * JSON request body for Kafka Config.
+ *
+ * @author guochunmeng
+ *
+ */
+@Getter
+@Setter
+public class KafkaConfig {
+
+ private int id;
+
+ private String name;
+
+ private boolean enabled;
+
+ private String brokerList;
+
+ private String zooKeeper;
+
+ private String group;
+
+ private boolean secure;
+
+ private String login;
+
+ private String pass;
+
+ private String securityProtocol;
+
+ private String includedTopic;
+
+ private String excludedTopic;
+
+ private Integer consumerCount;
+
+ private Integer timeout;
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index 8dfe1b16..6a262ca8 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -25,10 +25,6 @@ import lombok.Setter;
import java.util.List;
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONObject;
-import org.onap.datalake.feeder.enumeration.DataFormat;
-
/**
* JSON request body for Topic manipulation.
*
@@ -41,10 +37,12 @@ import org.onap.datalake.feeder.enumeration.DataFormat;
public class TopicConfig {
+ private int id;
private String name;
private String login;
private String password;
private List<String> sinkdbs;
+ private List<String> enabledSinkdbs;//only include enabled db
private boolean enabled;
private boolean saveRaw;
private String dataFormat;
@@ -53,82 +51,11 @@ public class TopicConfig {
private String messageIdPath;
private String aggregateArrayPath;
private String flattenArrayPath;
-
- public DataFormat getDataFormat2() {
- if (dataFormat != null) {
- return DataFormat.fromString(dataFormat);
- } else {
- return null;
- }
- }
-
- public boolean supportHdfs() {
- return containDb("HDFS");
- }
-
- public boolean supportElasticsearch() {
- return containDb("Elasticsearch");//TODO string hard codes
- }
-
- public boolean supportCouchbase() {
- return containDb("Couchbase");
- }
-
- public boolean supportDruid() {
- return containDb("Druid");
- }
-
- public boolean supportMongoDB() {
- return containDb("MongoDB");
- }
-
- private boolean containDb(String dbName) {
- return (sinkdbs != null && sinkdbs.contains(dbName));
- }
-
- //extract DB id from JSON attributes, support multiple attributes
- public String getMessageId(JSONObject json) {
- String id = null;
-
- if (StringUtils.isNotBlank(messageIdPath)) {
- String[] paths = messageIdPath.split(",");
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < paths.length; i++) {
- if (i > 0) {
- sb.append('^');
- }
- sb.append(json.query(paths[i]).toString());
- }
- id = sb.toString();
- }
-
- return id;
- }
-
- public String[] getAggregateArrayPath2() {
- String[] ret = null;
-
- if (StringUtils.isNotBlank(aggregateArrayPath)) {
- ret = aggregateArrayPath.split(",");
- }
-
- return ret;
- }
-
- public String[] getFlattenArrayPath2() {
- String[] ret = null;
-
- if (StringUtils.isNotBlank(flattenArrayPath)) {
- ret = flattenArrayPath.split(",");
- }
-
- return ret;
- }
-
+ private List<Integer> kafkas;
+
@Override
public String toString() {
- return name;
+ return String.format("TopicConfig %s(enabled=%s, enabledSinkdbs=%s)", name, enabled, enabledSinkdbs);
}
@Override
@@ -139,12 +66,12 @@ public class TopicConfig {
if (this.getClass() != obj.getClass())
return false;
- return name.equals(((TopicConfig) obj).getName());
+ return id==((TopicConfig) obj).getId();
}
@Override
public int hashCode() {
- return name.hashCode();
+ return id;
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java
new file mode 100644
index 00000000..39d02d36
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java
@@ -0,0 +1,54 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2018 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.enumeration;
+
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
+
+/**
+ * Database type
+ *
+ * @author Guobiao Mo
+ *
+ */
+public enum DbTypeEnum {
+ CB("Couchbase", CouchbaseService.class)
+ , DRUID("Druid", null)
+ , ES("Elasticsearch", ElasticsearchService.class)
+ , HDFS("HDFS", HdfsService.class)
+ , MONGO("MongoDB", MongodbService.class)
+ , KIBANA("Kibana", null)
+ , SUPERSET("Superset", null);
+
+ private final String name;
+ private final Class<? extends DbStoreService> serviceClass;
+
+ DbTypeEnum(String name, Class<? extends DbStoreService> serviceClass) {
+ this.name = name;
+ this.serviceClass = serviceClass;
+ }
+
+ public Class<? extends DbStoreService> getServiceClass(){
+ return serviceClass;
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java
new file mode 100644
index 00000000..157fbf94
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java
@@ -0,0 +1,38 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2018 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.enumeration;
+
+/**
+ * Design type
+ *
+ * @author Guobiao Mo
+ *
+ */
+public enum DesignTypeEnum {
+ KIBANA_DB("Kibana Dashboard"), KIBANA_SEARCH("Kibana Search"), KIBANA_VISUAL("Kibana Visualization"),
+ ES_MAPPING("Elasticsearch Field Mapping Template"), DRUID_KAFKA_SPEC("Druid Kafka Indexing Service Supervisor Spec");
+
+ private final String name;
+
+ DesignTypeEnum(String name) {
+ this.name = name;
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java
index b09dcdca..a744da6f 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java
@@ -31,7 +31,7 @@ import org.springframework.data.repository.CrudRepository;
*
*/
-public interface DbRepository extends CrudRepository<Db, String> {
+public interface DbRepository extends CrudRepository<Db, Integer> {
Db findByName(String Name);
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbTypeRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbTypeRepository.java
new file mode 100644
index 00000000..b93cb1d1
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbTypeRepository.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.DbType;
+import org.springframework.data.repository.CrudRepository;
+
+/**
+ * DbTypeEnum Repository
+ *
+ * @author Guobiao Mo
+ */
+
+public interface DbTypeRepository extends CrudRepository<DbType, String> {
+
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignRepository.java
new file mode 100644
index 00000000..f144e905
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignRepository.java
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.Design;
+import org.springframework.data.repository.CrudRepository;
+
+/**
+ * Design Repository
+ *
+ * @author guochunmeng
+ */
+
+public interface DesignRepository extends CrudRepository<Design, Integer> {
+
+ Design findByName(String name);
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignTypeRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignTypeRepository.java
new file mode 100755
index 00000000..e7ab48a2
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignTypeRepository.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.DesignType;
+import org.springframework.data.repository.CrudRepository;
+
+/**
+ * DesignType Repository
+ *
+ * @author guochunmeng
+ */
+
+public interface DesignTypeRepository extends CrudRepository<DesignType, String> {
+
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java
new file mode 100644
index 00000000..6ce23ba7
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java
@@ -0,0 +1,35 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.Kafka;
+import org.springframework.data.repository.CrudRepository;
+
+/**
+ *
+ * Kafka Repository
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+public interface KafkaRepository extends CrudRepository<Kafka, Integer> {
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java
new file mode 100644
index 00000000..9f8ea8a9
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java
@@ -0,0 +1,35 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.TopicName;
+import org.springframework.data.repository.CrudRepository;
+
+/**
+ *
+ * TopicName Repository
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+public interface TopicNameRepository extends CrudRepository<TopicName, String> {
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
index 2d9adef8..8f72dfed 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
@@ -31,6 +31,6 @@ import org.springframework.data.repository.CrudRepository;
*
*/
-public interface TopicRepository extends CrudRepository<Topic, String> {
-
+public interface TopicRepository extends CrudRepository<Topic, Integer> {
+ //List<Topic> findByTopicName(String topicStr);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
index 58bb433a..d54bf3f4 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
@@ -20,11 +20,17 @@
package org.onap.datalake.feeder.service;
-import java.util.Optional;
+import java.util.HashMap;
+import java.util.Map;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.domain.DbType;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
@@ -35,33 +41,32 @@ import org.springframework.stereotype.Service;
*/
@Service
public class DbService {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
- private DbRepository dbRepository;
+ private ApplicationContext context;
- public Db getDb(String name) {
- Optional<Db> ret = dbRepository.findById(name);
- return ret.isPresent() ? ret.get() : null;
- }
-
- public Db getCouchbase() {
- return getDb("Couchbase");
- }
-
- public Db getElasticsearch() {
- return getDb("Elasticsearch");
- }
+ private Map<Integer, DbStoreService> dbStoreServiceMap = new HashMap<>();
- public Db getMongoDB() {
- return getDb("MongoDB");
- }
+ public DbStoreService findDbStoreService(Db db) {
+ int dbId = db.getId();
+ if (dbStoreServiceMap.containsKey(dbId)) {
+ return dbStoreServiceMap.get(dbId);
+ }
- public Db getDruid() {
- return getDb("Druid");
- }
+ DbType dbType = db.getDbType();
+ DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
+ Class<? extends DbStoreService> serviceClass = dbTypeEnum.getServiceClass();
+
+ if (serviceClass == null) {
+ log.error("Should not have come here {}", db);
+ dbStoreServiceMap.put(dbId, null);
+ return null;
+ }
+
+ DbStoreService ret = context.getBean(serviceClass, db);
+ dbStoreServiceMap.put(dbId, ret);
- public Db getHdfs() {
- return getDb("HDFS");
+ return ret;
}
-
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignService.java
new file mode 100755
index 00000000..d4924972
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignService.java
@@ -0,0 +1,272 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.*;
+import org.onap.datalake.feeder.domain.Design;
+import org.onap.datalake.feeder.dto.DesignConfig;
+import org.onap.datalake.feeder.enumeration.DesignTypeEnum;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.repository.DesignTypeRepository;
+import org.onap.datalake.feeder.repository.DesignRepository;
+import org.onap.datalake.feeder.repository.TopicNameRepository;
+import org.onap.datalake.feeder.util.HttpClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service for portalDesigns
+ *
+ * @author guochunmeng
+ */
+
+@Service
+public class DesignService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private static String POST_FLAG;
+
+ private static String URL_FlAG;
+
+ @Autowired
+ private DesignRepository designRepository;
+
+ @Autowired
+ private TopicNameRepository topicNameRepository;
+
+ @Autowired
+ private DesignTypeRepository designTypeRepository;
+
+ @Autowired
+ private ApplicationConfiguration applicationConfiguration;
+
+ @Autowired
+ private DbRepository dbRepository;
+
+ public Design fillDesignConfiguration(DesignConfig designConfig) {
+ Design design = new Design();
+ fillDesign(designConfig, design);
+ return design;
+ }
+
+ public void fillDesignConfiguration(DesignConfig designConfig, Design design) {
+ fillDesign(designConfig, design);
+ }
+
+ private void fillDesign(DesignConfig designConfig, Design design) throws IllegalArgumentException {
+
+ design.setId(designConfig.getId());
+ design.setBody(designConfig.getBody());
+ design.setName(designConfig.getName());
+ design.setNote(designConfig.getNote());
+ design.setSubmitted(designConfig.getSubmitted());
+
+ if (designConfig.getTopicName() == null)
+ throw new IllegalArgumentException("Can not find topicName in tpoic_name, topic name: " + designConfig.getTopicName());
+ Optional<TopicName> topicName = topicNameRepository.findById(designConfig.getTopicName());
+ if (!topicName.isPresent())
+ throw new IllegalArgumentException("topicName is null " + designConfig.getTopicName());
+ design.setTopicName(topicName.get());
+
+ if (designConfig.getDesignType() == null)
+ throw new IllegalArgumentException("Can not find designType in design_type, designType id " + designConfig.getDesignType());
+ Optional<DesignType> designType = designTypeRepository.findById(designConfig.getDesignType());
+ if (!designType.isPresent())
+ throw new IllegalArgumentException("designType is null");
+ design.setDesignType(designType.get());
+
+ Set<Db> dbs = new HashSet<>();
+ if (designConfig.getDbs() != null) {
+ for (Integer item : designConfig.getDbs()) {
+ Optional<Db> db = dbRepository.findById(item);
+ if (db.isPresent()) {
+ dbs.add(db.get());
+ }
+ }
+ if (!dbs.isEmpty())
+ design.setDbs(dbs);
+ else {
+ design.getDbs().clear();
+ design.setDbs(dbs);
+ }
+ } else {
+ design.setDbs(dbs);
+ }
+ }
+
+ public Design getDesign(Integer id) {
+
+ Optional<Design> ret = designRepository.findById(id);
+ return ret.isPresent() ? ret.get() : null;
+ }
+
+ public List<DesignConfig> queryAllDesign() {
+
+ List<Design> designList = null;
+ List<DesignConfig> designConfigList = new ArrayList<>();
+ designList = (List<Design>) designRepository.findAll();
+ if (!designList.isEmpty()) {
+ log.info("DesignList is not null");
+ for (Design design : designList) {
+ designConfigList.add(design.getDesignConfig());
+ }
+ }
+ return designConfigList;
+ }
+
+ public Map<Integer, Boolean> deploy(Design design) {
+ Map<Integer, Boolean> resultMap = null;
+ DesignType designType = design.getDesignType();
+ DesignTypeEnum designTypeEnum = DesignTypeEnum.valueOf(designType.getId());
+
+ switch (designTypeEnum) {
+ case KIBANA_DB:
+ log.info("Deploy kibana dashboard");
+ resultMap = deployKibanaDashboardImport(design);
+ deploySave(resultMap, design);
+ break;
+ case ES_MAPPING:
+ log.info("Deploy elasticsearch mapping template");
+ resultMap = postEsMappingTemplate(design, design.getTopicName().getId().toLowerCase());
+ deploySave(resultMap, design);
+ break;
+ default:
+ log.error("Not implemented {}", designTypeEnum);
+ break;
+ }
+ log.info("Response resultMap: " + resultMap);
+ return resultMap;
+ }
+
+ private Map<Integer, Boolean> deployKibanaDashboardImport(Design design) {
+ URL_FlAG = "Kibana";
+ POST_FLAG = "KibanaDashboardImport";
+ String requestBody = design.getBody();
+ Set<Db> dbs = design.getDbs();
+ Map<Integer, Boolean> deployKibanaMap = new HashMap<>();
+
+ if (!dbs.isEmpty()) {
+ Map<Integer, String> map = urlMap(dbs, URL_FlAG);
+ log.info("Deploy kibana dashboard url map: " + map);
+ if (!map.isEmpty()) {
+ Iterator<Map.Entry<Integer, String>> it = map.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Integer, String> entry = it.next();
+ deployKibanaMap.put(entry.getKey(), HttpClientUtil.sendHttpClientPost(entry.getValue(), requestBody, POST_FLAG, URL_FlAG));
+ }
+ }
+ return deployKibanaMap;
+ } else {
+ return deployKibanaMap;
+ }
+ }
+
+ /**
+ * successed resp: { "acknowledged": true }
+ *
+ * @param design
+ * @param templateName
+ * @return flag
+ */
+ public Map<Integer, Boolean> postEsMappingTemplate(Design design, String templateName) {
+ URL_FlAG = "Elasticsearch";
+ POST_FLAG = "ElasticsearchMappingTemplate";
+ String requestBody = design.getBody();
+ Set<Db> dbs = design.getDbs();
+ Map<Integer, Boolean> deployEsMap = new HashMap<>();
+
+ if (!dbs.isEmpty()) {
+ Map<Integer, String> map = urlMap(dbs, URL_FlAG);
+ log.info("Deploy elasticsearch url map: " + map);
+ if (!map.isEmpty()) {
+ Iterator<Map.Entry<Integer, String>> it = map.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Integer, String> entry = it.next();
+ deployEsMap.put(entry.getKey(), HttpClientUtil.sendHttpClientPost(entry.getValue()+templateName, requestBody, POST_FLAG, URL_FlAG));
+ }
+ }
+ return deployEsMap;
+ } else {
+ return deployEsMap;
+ }
+ }
+
+ private Map<Integer, String> urlMap (Set<Db> dbs, String flag) {
+ Map<Integer, String> map = new HashMap<>();
+ for (Db item : dbs) {
+ if (item.isEnabled()) {
+ map.put(item.getId(), httpRequestUrl(item.getHost(), item.getPort(), flag));
+ }
+ }
+ return map;
+ }
+
+ private String httpRequestUrl(String host, Integer port, String urlFlag) {
+ String url = "";
+ switch (urlFlag) {
+ case "Kibana":
+ if (port == null) {
+ port = applicationConfiguration.getKibanaPort();
+ }
+ url = "http://" + host + ":" + port + applicationConfiguration.getKibanaDashboardImportApi();
+ log.info("Kibana url: " + url);
+ break;
+ case "Elasticsearch":
+ if (port == null) {
+ port = applicationConfiguration.getEsPort();
+ }
+ url = "http://" + host + ":" + port + applicationConfiguration.getEsTemplateMappingApi();
+ log.info("Elasticsearch url: " + url);
+ break;
+ default:
+ break;
+ }
+ return url;
+ }
+
+ private void deploySave(Map<Integer, Boolean> map, Design design) {
+ if (!map.isEmpty()) {
+ Iterator<Map.Entry<Integer, Boolean>> it = map.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Integer, Boolean> entry = it.next();
+ if (entry.getValue()) {
+ design.setSubmitted(true);
+ designRepository.save(design);
+ log.info("Status was modified");
+ }
+ }
+ }
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignTypeService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignTypeService.java
new file mode 100755
index 00000000..58bc35e4
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignTypeService.java
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onap.datalake.feeder.domain.DesignType;
+import org.onap.datalake.feeder.dto.DesignTypeConfig;
+import org.onap.datalake.feeder.repository.DesignTypeRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service for designTypes
+ *
+ * @author guochunmeng
+ */
+@Service
+public class DesignTypeService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ DesignTypeRepository designTypeRepository;
+
+ public List<DesignTypeConfig> getDesignTypes(){
+
+ List<DesignType> designTypeList = null;
+ List<DesignTypeConfig> designTypeConfigList = new ArrayList<>();
+ designTypeList = (List<DesignType>)designTypeRepository.findAll();
+ if (designTypeList != null && !designTypeList.isEmpty()) {
+ log.info("DesignTypeList is not null");
+ for(DesignType designType : designTypeList) {
+ designTypeConfigList.add(designType.getDesignTypeConfig());
+ }
+ }
+
+ return designTypeConfigList;
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
index 3be5be6e..671234ba 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -21,24 +21,28 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
/**
@@ -48,6 +52,7 @@ import org.springframework.stereotype.Service;
*
*/
@Service
+@Scope("prototype")
public class DmaapService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -60,16 +65,29 @@ public class DmaapService {
private ZooKeeper zk;
+ private Kafka kafka;
+
+ public DmaapService(Kafka kafka) {
+ this.kafka = kafka;
+ }
+
@PreDestroy
public void cleanUp() throws InterruptedException {
- if (zk != null) {
- zk.close();
+ config.getShutdownLock().readLock().lock();
+
+ try {
+ if (zk != null) {
+ log.info("cleanUp() called, close zk.");
+ zk.close();
+ }
+ } finally {
+ config.getShutdownLock().readLock().unlock();
}
}
@PostConstruct
private void init() throws IOException, InterruptedException {
- zk = connect(config.getDmaapZookeeperHostPort());
+ zk = connect(kafka.getZooKeeper());
}
//get all topic names from Zookeeper
@@ -77,11 +95,11 @@ public class DmaapService {
public List<String> getTopics() {
try {
if (zk == null) {
- zk = connect(config.getDmaapZookeeperHostPort());
+ zk = connect(kafka.getZooKeeper());
}
- log.info("connecting to ZooKeeper {} for a list of topics.", config.getDmaapZookeeperHostPort());
+ log.info("connecting to ZooKeeper {} for a list of topics.", kafka.getZooKeeper());
List<String> topics = zk.getChildren("/brokers/topics", false);
- String[] excludes = config.getDmaapKafkaExclude();
+ String[] excludes = kafka.getExcludedTopic().split(",");
topics.removeAll(Arrays.asList(excludes));
log.info("list of topics: {}", topics);
return topics;
@@ -93,7 +111,7 @@ public class DmaapService {
}
private ZooKeeper connect(String host) throws IOException, InterruptedException {
- log.info("connecting to ZooKeeper {} ...", config.getDmaapZookeeperHostPort());
+ log.info("connecting to ZooKeeper {} ...", kafka.getZooKeeper());
CountDownLatch connectedSignal = new CountDownLatch(1);
ZooKeeper ret = new ZooKeeper(host, 10000, new Watcher() {
public void process(WatchedEvent we) {
@@ -119,18 +137,20 @@ public class DmaapService {
return ret;
}
*/
- public List<TopicConfig> getActiveTopicConfigs() throws IOException {
+ public Map<String, List<EffectiveTopic>> getActiveEffectiveTopic() throws IOException {
log.debug("entering getActiveTopicConfigs()...");
- List<String> allTopics = getTopics();
+ List<String> allTopics = getTopics(); //topics in Kafka cluster TODO update table topic_name with new topics
- List<TopicConfig> ret = new ArrayList<>(allTopics.size());
+ Map<String, List<EffectiveTopic>> ret = new HashMap<>();
for (String topicStr : allTopics) {
log.debug("get topic setting from DB: {}.", topicStr);
- TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true);
- if (topicConfig.isEnabled()) {
- ret.add(topicConfig);
+ List<EffectiveTopic> effectiveTopics= topicService.getEnabledEffectiveTopic(kafka, topicStr, true);
+ if(CollectionUtils.isNotEmpty(effectiveTopics )) {
+ log.debug("add effectiveTopics {}:{}.", topicStr, effectiveTopics);
+ ret.put(topicStr , effectiveTopics);
}
+
}
return ret;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java
new file mode 100644
index 00000000..2e959fa2
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.service;
+
+import org.onap.datalake.feeder.domain.Kafka;
+import org.onap.datalake.feeder.dto.KafkaConfig;
+import org.onap.datalake.feeder.repository.KafkaRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+
+/**
+ * Service for kafkas
+ *
+ * @author guochunmeng
+ */
+@Service
+public class KafkaService {
+
+ @Autowired
+ private KafkaRepository kafkaRepository;
+
+ public Kafka getKafkaById(int id) {
+
+ Optional<Kafka> ret = kafkaRepository.findById(id);
+ return ret.isPresent() ? ret.get() : null;
+ }
+
+ public List<KafkaConfig> getAllKafka() {
+
+ List<KafkaConfig> kafkaConfigList = new ArrayList<>();
+ Iterable<Kafka> kafkaIterable = kafkaRepository.findAll();
+ for(Kafka portal : kafkaIterable) {
+ kafkaConfigList.add(portal.getKafkaConfig());
+ }
+ return kafkaConfigList;
+ }
+
+ public Kafka fillKafkaConfiguration(KafkaConfig kafkaConfig) {
+ Kafka kafka = new Kafka();
+ fillKafka(kafkaConfig, kafka);
+ return kafka;
+ }
+
+ public void fillKafkaConfiguration(KafkaConfig kafkaConfig, Kafka kafka) {
+ fillKafka(kafkaConfig, kafka);
+ }
+
+ private void fillKafka(KafkaConfig kafkaConfig, Kafka kafka) {
+
+ kafka.setId(kafkaConfig.getId());
+ kafka.setBrokerList(kafkaConfig.getBrokerList());
+ kafka.setConsumerCount(kafkaConfig.getConsumerCount());
+ kafka.setEnabled(kafkaConfig.isEnabled());
+ kafka.setExcludedTopic(kafkaConfig.getExcludedTopic());
+ kafka.setIncludedTopic(kafkaConfig.getIncludedTopic());
+ kafka.setGroup(kafkaConfig.getGroup());
+ kafka.setLogin(kafkaConfig.getLogin());
+ kafka.setName(kafkaConfig.getName());
+ kafka.setPass(kafkaConfig.getPass());
+ kafka.setSecure(kafkaConfig.isSecure());
+ kafka.setSecurityProtocol(kafkaConfig.getSecurityProtocol());
+ kafka.setTimeout(kafkaConfig.getTimeout());
+ kafka.setZooKeeper(kafkaConfig.getZooKeeper());
+
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
index 7ed88797..09a59ee3 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
@@ -21,14 +21,19 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Kafka;
+import org.onap.datalake.feeder.repository.KafkaRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
@@ -45,17 +50,20 @@ public class PullService {
private boolean isRunning = false;
private ExecutorService executorService;
- private Thread topicConfigPollingThread;
+ private Set<Puller> pullers;
@Autowired
- private Puller puller;
+ private KafkaRepository kafkaRepository;
@Autowired
private TopicConfigPollingService topicConfigPollingService;
-
+
@Autowired
private ApplicationConfiguration config;
+ @Autowired
+ private ApplicationContext context;
+
/**
* @return the isRunning
*/
@@ -73,23 +81,33 @@ public class PullService {
return;
}
- logger.info("start pulling ...");
- int numConsumers = config.getKafkaConsumerCount();
- executorService = Executors.newFixedThreadPool(numConsumers);
+ logger.info("PullService starting ...");
- for (int i = 0; i < numConsumers; i++) {
- executorService.submit(puller);
+ pullers = new HashSet<>();
+ executorService = Executors.newCachedThreadPool();
+
+ Iterable<Kafka> kafkas = kafkaRepository.findAll();
+ for (Kafka kafka : kafkas) {
+ if (kafka.isEnabled()) {
+ doKafka(kafka);
+ }
}
-
- topicConfigPollingThread = new Thread(topicConfigPollingService);
- topicConfigPollingThread.setName("TopicConfigPolling");
- topicConfigPollingThread.start();
+ executorService.submit(topicConfigPollingService);
+
isRunning = true;
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
+ private void doKafka(Kafka kafka) {
+ Puller puller = context.getBean(Puller.class, kafka);
+ pullers.add(puller);
+ for (int i = 0; i < kafka.getConsumerCount(); i++) {
+ executorService.submit(puller);
+ }
+ }
+
/**
* stop pulling
*/
@@ -98,20 +116,23 @@ public class PullService {
return;
}
- logger.info("stop pulling ...");
- puller.shutdown();
-
- logger.info("stop TopicConfigPollingService ...");
- topicConfigPollingService.shutdown();
-
+ config.getShutdownLock().writeLock().lock();
try {
- topicConfigPollingThread.join();
-
+ logger.info("stop pulling ...");
+ for (Puller puller : pullers) {
+ puller.shutdown();
+ }
+
+ logger.info("stop executorService ...");
executorService.shutdown();
executorService.awaitTermination(120L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- logger.error("executor.awaitTermination", e);
+ logger.error("shutdown(): executor.awaitTermination", e);
Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ logger.error("shutdown error.", e);
+ } finally {
+ config.getShutdownLock().writeLock().unlock();
}
isRunning = false;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
index 9e4ab455..ab99ad09 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
@@ -26,11 +26,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -39,10 +39,10 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
@@ -54,7 +54,7 @@ import org.springframework.stereotype.Service;
*/
@Service
-//@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+@Scope("prototype")
public class Puller implements Runnable {
@Autowired
@@ -68,10 +68,17 @@ public class Puller implements Runnable {
private final Logger log = LoggerFactory.getLogger(this.getClass());
+ //KafkaConsumer is not thread-safe.
private ThreadLocal<KafkaConsumer<String, String>> consumerLocal = new ThreadLocal<>(); //<String, String> is key-value type, in our case key is empty, value is JSON text
private boolean active = false;
private boolean async;
+
+ private Kafka kafka;
+
+ public Puller(Kafka kafka) {
+ this.kafka = kafka;
+ }
@PostConstruct
private void init() {
@@ -81,8 +88,8 @@ public class Puller implements Runnable {
private Properties getConsumerConfig() {
Properties consumerConfig = new Properties();
- consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort());
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup());
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokerList());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, kafka.getGroup());
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(Thread.currentThread().getId()));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
@@ -90,9 +97,12 @@ public class Puller implements Runnable {
consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- // consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
- // consumerConfig.put("sasl.mechanism", "PLAIN");
-
+ if (kafka.isSecure()) {
+ String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + kafka.getLogin() + " password=" + kafka.getPass() + " serviceName=kafka;";
+ consumerConfig.put("sasl.jaas.config", jaas);
+ consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafka.getSecurityProtocol());
+ consumerConfig.put("sasl.mechanism", "PLAIN");
+ }
return consumerConfig;
}
@@ -103,7 +113,7 @@ public class Puller implements Runnable {
public void run() {
active = true;
Properties consumerConfig = getConsumerConfig();
- log.info("Kafka ConsumerConfig: {}", consumerConfig);
+ log.info("Kafka: {}, ConsumerConfig: {}", kafka, consumerConfig);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
consumerLocal.set(consumer);
@@ -111,8 +121,8 @@ public class Puller implements Runnable {
try {
while (active) {
- if (topicConfigPollingService.isActiveTopicsChanged(true)) {//true means update local version as well
- List<String> topics = topicConfigPollingService.getActiveTopics();
+ if (topicConfigPollingService.isActiveTopicsChanged(kafka)) {
+ Collection<String> topics = topicConfigPollingService.getActiveTopics(kafka);
log.info("Active Topic list is changed, subscribe to the latest topics: {}", topics);
consumer.subscribe(topics, rebalanceListener);
}
@@ -132,7 +142,7 @@ public class Puller implements Runnable {
KafkaConsumer<String, String> consumer = consumerLocal.get();
log.debug("pulling...");
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(kafka.getTimeout()));
log.debug("done pulling.");
if (records != null && records.count() > 0) {
@@ -144,10 +154,10 @@ public class Puller implements Runnable {
messages.add(Pair.of(record.timestamp(), record.value()));
//log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
}
- storeService.saveMessages(partition.topic(), messages);
+ storeService.saveMessages(kafka, partition.topic(), messages);
log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
- if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+ if (!async) {//for reliability, sync commit offset to Kafka right after saving the data to data store, this slows down a bit
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 2a2f997e..0e54b9b5 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -22,7 +22,9 @@ package org.onap.datalake.feeder.service;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Set;
import javax.annotation.PostConstruct;
@@ -32,8 +34,11 @@ import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Kafka;
import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.service.db.DbStoreService;
import org.onap.datalake.feeder.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,19 +65,10 @@ public class StoreService {
private ApplicationConfiguration config;
@Autowired
- private TopicConfigPollingService configPollingService;
-
- @Autowired
- private MongodbService mongodbService;
-
- @Autowired
- private CouchbaseService couchbaseService;
-
- @Autowired
- private ElasticsearchService elasticsearchService;
+ private DbService dbService;
@Autowired
- private HdfsService hdfsService;
+ private TopicConfigPollingService configPollingService;
private ObjectMapper yamlReader;
@@ -81,43 +77,57 @@ public class StoreService {
yamlReader = new ObjectMapper(new YAMLFactory());
}
- public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
+ public void saveMessages(Kafka kafka, String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
if (CollectionUtils.isEmpty(messages)) {
return;
}
- TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr);
+ Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr);
+ for (EffectiveTopic effectiveTopic : effectiveTopics) {
+ saveMessagesForTopic(effectiveTopic, messages);
+ }
+ }
+
+ private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) {
+ if (!effectiveTopic.getTopic().isEnabled()) {
+ log.error("we should not come here {}", effectiveTopic);
+ return;
+ }
List<JSONObject> docs = new ArrayList<>();
for (Pair<Long, String> pair : messages) {
try {
- docs.add(messageToJson(topicConfig, pair));
+ docs.add(messageToJson(effectiveTopic, pair));
} catch (Exception e) {
//may see org.json.JSONException.
log.error("Error when converting this message to JSON: " + pair.getRight(), e);
}
}
- saveJsons(topicConfig, docs, messages);
+ Set<Db> dbs = effectiveTopic.getTopic().getDbs();
+
+ for (Db db : dbs) {
+ if (db.isTool() || db.isDruid() || !db.isEnabled()) {
+ continue;
+ }
+ DbStoreService dbStoreService = dbService.findDbStoreService(db);
+ if (dbStoreService != null) {
+ dbStoreService.saveJsons(effectiveTopic, docs);
+ }
+ }
}
- private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
+ private JSONObject messageToJson(EffectiveTopic effectiveTopic, Pair<Long, String> pair) throws IOException {
long timestamp = pair.getLeft();
String text = pair.getRight();
- //for debug, to be remove
- // String topicStr = topic.getId();
- // if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
- // log.debug("{} ={}", topicStr, text);
- //}
-
- boolean storeRaw = topicConfig.isSaveRaw();
+ boolean storeRaw = effectiveTopic.getTopic().isSaveRaw();
JSONObject json = null;
- DataFormat dataFormat = topicConfig.getDataFormat2();
+ DataFormat dataFormat = effectiveTopic.getTopic().getDataFormat2();
switch (dataFormat) {
case JSON:
@@ -148,15 +158,15 @@ public class StoreService {
json.put(config.getRawDataLabel(), text);
}
- if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
- String[] paths = topicConfig.getAggregateArrayPath2();
+ if (StringUtils.isNotBlank(effectiveTopic.getTopic().getAggregateArrayPath())) {
+ String[] paths = effectiveTopic.getTopic().getAggregateArrayPath2();
for (String path : paths) {
JsonUtil.arrayAggregate(path, json);
}
}
- if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
- String[] paths = topicConfig.getFlattenArrayPath2();
+ if (StringUtils.isNotBlank(effectiveTopic.getTopic().getFlattenArrayPath())) {
+ String[] paths = effectiveTopic.getTopic().getFlattenArrayPath2();
for (String path : paths) {
JsonUtil.flattenArray(path, json);
}
@@ -165,29 +175,11 @@ public class StoreService {
return json;
}
- private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) {
- if (topic.supportMongoDB()) {
- mongodbService.saveJsons(topic, jsons);
- }
-
- if (topic.supportCouchbase()) {
- couchbaseService.saveJsons(topic, jsons);
- }
-
- if (topic.supportElasticsearch()) {
- elasticsearchService.saveJsons(topic, jsons);
- }
-
- if (topic.supportHdfs()) {
- hdfsService.saveMessages(topic, messages);
- }
- }
-
public void flush() { //force flush all buffer
- hdfsService.flush();
+ // hdfsService.flush();
}
public void flushStall() { //flush stall buffer
- hdfsService.flushStall();
+ // hdfsService.flushStall();
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
index 58b27834..a02cd6a2 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
@@ -21,23 +21,27 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Kafka;
+import org.onap.datalake.feeder.repository.KafkaRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
- * Service to check topic changes in Kafka and topic setting updates
+ * Service to check topic changes in Kafka and topic setting updates in DB
*
* @author Guobiao Mo
*
@@ -51,70 +55,93 @@ public class TopicConfigPollingService implements Runnable {
ApplicationConfiguration config;
@Autowired
- private DmaapService dmaapService;
+ private ApplicationContext context;
- //effective TopicConfig Map
- private Map<String, TopicConfig> effectiveTopicConfigMap = new HashMap<>();
-
- //monitor Kafka topic list changes
- private List<String> activeTopics;
- private ThreadLocal<Integer> activeTopicsVersionLocal = ThreadLocal.withInitial(() -> -1);
- private int currentActiveTopicsVersion = -1;
+ @Autowired
+ private KafkaRepository kafkaRepository;
+
+ //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
+ private Map<Integer, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
+ //private Map<String, TopicConfig> effectiveTopicConfigMap;
+
+ //monitor Kafka topic list changes, key is kafka id, value is active Topics
+ private Map<Integer, Set<String>> activeTopicMap;
+
+ private ThreadLocal<Map<Integer, Integer>> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//kafkaId:version - local 'old' version
+ private Map<Integer, Integer> currentActiveTopicsVersionMap = new HashMap<>();//kafkaId:version - current/latest version
+ private Map<Integer, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
private boolean active = false;
@PostConstruct
private void init() {
try {
- log.info("init(), ccalling poll()...");
- activeTopics = poll();
- currentActiveTopicsVersion++;
+ log.info("init(), calling poll()...");
+ activeTopicMap = poll();
} catch (Exception ex) {
log.error("error connection to HDFS.", ex);
}
}
- public boolean isActiveTopicsChanged(boolean update) {
- boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get();
- log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get());
- if (changed && update) {
- activeTopicsVersionLocal.set(currentActiveTopicsVersion);
+ public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version
+ int kafkaId = kafka.getId();
+ int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version
+ int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0);
+
+ boolean changed = currentActiveTopicsVersion > localActiveTopicsVersion;
+ log.debug("kafkaId={} isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", kafkaId, changed, currentActiveTopicsVersion, localActiveTopicsVersion);
+ if (changed) {
+ activeTopicsVersionLocal.get().put(kafkaId, currentActiveTopicsVersion);
}
return changed;
}
- public List<String> getActiveTopics() {
- return activeTopics;
+ //get a list of topic names to monitor
+ public Collection<String> getActiveTopics(Kafka kafka) {
+ return activeTopicMap.get(kafka.getId());
}
- public TopicConfig getEffectiveTopicConfig(String topicStr) {
- return effectiveTopicConfigMap.get(topicStr);
+ //get the EffectiveTopics given kafka and topic name
+ public Collection<EffectiveTopic> getEffectiveTopic(Kafka kafka, String topicStr) {
+ Map<String, List<EffectiveTopic>> effectiveTopicMapKafka= effectiveTopicMap.get(kafka.getId());
+ return effectiveTopicMapKafka.get(topicStr);
}
@Override
public void run() {
active = true;
log.info("TopicConfigPollingService started.");
-
+
while (active) {
try { //sleep first since we already pool in init()
- Thread.sleep(config.getDmaapCheckNewTopicInterval());
+ Thread.sleep(config.getCheckTopicInterval());
+ if(!active) {
+ break;
+ }
} catch (InterruptedException e) {
log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e);
Thread.currentThread().interrupt();
}
try {
- List<String> newTopics = poll();
- if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) {
- log.info("activeTopics list is updated, old={}", activeTopics);
- log.info("activeTopics list is updated, new={}", newTopics);
-
- activeTopics = newTopics;
- currentActiveTopicsVersion++;
- } else {
- log.debug("activeTopics list is not updated.");
+ Map<Integer, Set<String>> newTopicsMap = poll();
+
+ for(Map.Entry<Integer, Set<String>> entry:newTopicsMap.entrySet()) {
+ Integer kafkaId = entry.getKey();
+ Set<String> newTopics = entry.getValue();
+
+ Set<String> activeTopics = activeTopicMap.get(kafkaId);
+
+ if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) {
+ log.info("activeTopics list is updated, old={}", activeTopics);
+ log.info("activeTopics list is updated, new={}", newTopics);
+
+ activeTopicMap.put(kafkaId, newTopics);
+ currentActiveTopicsVersionMap.put(kafkaId, currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1)+1);
+ } else {
+ log.debug("activeTopics list is not updated.");
+ }
}
} catch (IOException e) {
log.error("dmaapService.getActiveTopics()", e);
@@ -128,13 +155,31 @@ public class TopicConfigPollingService implements Runnable {
active = false;
}
- private List<String> poll() throws IOException {
+ private Map<Integer, Set<String>> poll() throws IOException {
+ Map<Integer, Set<String>> ret = new HashMap<>();
+ Iterable<Kafka> kafkas = kafkaRepository.findAll();
+ for (Kafka kafka : kafkas) {
+ if (kafka.isEnabled()) {
+ Set<String> topics = poll(kafka);
+ ret.put(kafka.getId(), topics);
+ }
+ }
+ return ret;
+ }
+
+ private Set<String> poll(Kafka kafka) throws IOException {
log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
- List<TopicConfig> activeTopicConfigs = dmaapService.getActiveTopicConfigs();
- activeTopicConfigs.stream().forEach(topicConfig -> effectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
- List<String> ret = new ArrayList<>(activeTopicConfigs.size());
- activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName()));
+ DmaapService dmaapService = dmaapServiceMap.get(kafka.getId());
+ if(dmaapService==null) {
+ dmaapService = context.getBean(DmaapService.class, kafka);
+ dmaapServiceMap.put(kafka.getId(), dmaapService);
+ }
+
+ Map<String, List<EffectiveTopic>> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic();
+ effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics);
+
+ Set<String> ret = activeEffectiveTopics.keySet();
return ret;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 64e8b8b1..043cc653 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -21,23 +21,31 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
+import org.apache.commons.collections.CollectionUtils;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Kafka;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.repository.KafkaRepository;
+import org.onap.datalake.feeder.repository.TopicNameRepository;
import org.onap.datalake.feeder.repository.TopicRepository;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * Service for topics
+ * Service for topics
*
* @author Guobiao Mo
*
@@ -49,72 +57,93 @@ public class TopicService {
@Autowired
private ApplicationConfiguration config;
-
+
+ @Autowired
+ private TopicNameRepository topicNameRepository;
+
@Autowired
private TopicRepository topicRepository;
@Autowired
- private ElasticsearchService elasticsearchService;
+ private DbRepository dbRepository;
+ @Autowired
+ private DbService dbService;
@Autowired
- private DbRepository dbRepository;
+ private KafkaRepository kafkaRepository;
+
+ public List<EffectiveTopic> getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException {
- public TopicConfig getEffectiveTopic(String topicStr) {
- try {
- return getEffectiveTopic(topicStr, false);
- } catch (IOException e) {
- log.error(topicStr, e);
+ List<Topic> topics = findTopics(kafka, topicStr);
+ if (CollectionUtils.isEmpty(topics)) {
+ topics = new ArrayList<>();
+ topics.add(getDefaultTopic(kafka));
}
- return null;
- }
- public TopicConfig getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
- Topic topic = getTopic(topicStr);
- if (topic == null) {
- topic = getDefaultTopic();
+ List<EffectiveTopic> ret = new ArrayList<>();
+ for (Topic topic : topics) {
+ if (!topic.isEnabled()) {
+ continue;
+ }
+ ret.add(new EffectiveTopic(topic, topicStr));
+
+ if (ensureTableExist) {
+ for (Db db : topic.getDbs()) {
+ if (db.isElasticsearch()) {
+ ElasticsearchService elasticsearchService = (ElasticsearchService) dbService.findDbStoreService(db);
+ elasticsearchService.ensureTableExist(topicStr);
+ }
+ }
+ }
}
- TopicConfig topicConfig = topic.getTopicConfig();
- topicConfig.setName(topicStr);//need to change name if it comes from DefaultTopic
+
+ return ret;
+ }
+
+ //TODO use query
+ public List<Topic> findTopics(Kafka kafka, String topicStr) {
+ List<Topic> ret = new ArrayList<>();
- if(ensureTableExist && topicConfig.isEnabled() && topicConfig.supportElasticsearch()) {
- elasticsearchService.ensureTableExist(topicStr);
+ Iterable<Topic> allTopics = topicRepository.findAll();
+ for(Topic topic: allTopics) {
+ if(topic.getKafkas().contains(kafka ) && topic.getTopicName().getId().equals(topicStr)){
+ ret.add(topic);
+ }
}
- return topicConfig;
+ return ret;
}
- public Topic getTopic(String topicStr) {
- Optional<Topic> ret = topicRepository.findById(topicStr);
+ public Topic getTopic(int topicId) {
+ Optional<Topic> ret = topicRepository.findById(topicId);
return ret.isPresent() ? ret.get() : null;
}
- public Topic getDefaultTopic() {
- return getTopic(config.getDefaultTopicName());
+ public Topic getDefaultTopic(Kafka kafka) {
+ return findTopics(kafka, config.getDefaultTopicName()).get(0);
}
- public boolean istDefaultTopic(Topic topic) {
+ public boolean isDefaultTopic(Topic topic) {
if (topic == null) {
return false;
}
return topic.getName().equals(config.getDefaultTopicName());
}
- public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic)
- {
+ public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic) {
fillTopic(tConfig, wTopic);
}
- public Topic fillTopicConfiguration(TopicConfig tConfig)
- {
+ public Topic fillTopicConfiguration(TopicConfig tConfig) {
Topic topic = new Topic();
fillTopic(tConfig, topic);
return topic;
}
- private void fillTopic(TopicConfig tConfig, Topic topic)
- {
+ private void fillTopic(TopicConfig tConfig, Topic topic) {
Set<Db> relateDb = new HashSet<>();
- topic.setName(tConfig.getName());
+ topic.setId(tConfig.getId());
+ topic.setTopicName(topicNameRepository.findById(tConfig.getName()).get());
topic.setLogin(tConfig.getLogin());
topic.setPass(tConfig.getPassword());
topic.setEnabled(tConfig.isEnabled());
@@ -126,24 +155,38 @@ public class TopicService {
topic.setAggregateArrayPath(tConfig.getAggregateArrayPath());
topic.setFlattenArrayPath(tConfig.getFlattenArrayPath());
- if(tConfig.getSinkdbs() != null) {
+ if (tConfig.getSinkdbs() != null) {
for (String item : tConfig.getSinkdbs()) {
Db sinkdb = dbRepository.findByName(item);
if (sinkdb != null) {
relateDb.add(sinkdb);
}
}
- if(relateDb.size() > 0)
+ if (!relateDb.isEmpty())
topic.setDbs(relateDb);
- else if(relateDb.size() == 0)
- {
+ else {
topic.getDbs().clear();
}
- }else
- {
+ } else {
topic.setDbs(relateDb);
}
+ Set<Kafka> relateKafka = new HashSet<>();
+ if (tConfig.getKafkas() != null) {
+ for (int item : tConfig.getKafkas()) {
+ Optional<Kafka> sinkKafka = kafkaRepository.findById(item);
+ if (sinkKafka.isPresent()) {
+ relateKafka.add(sinkKafka.get());
+ }
+ }
+ if (!relateKafka.isEmpty()) {
+ topic.setKafkas(relateKafka);
+ } else {
+ topic.getKafkas().clear();
+ }
+ } else {
+ topic.setKafkas(relateKafka);
+ }
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
index d7d5f873..44b940a2 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.datalake.feeder.service;
+package org.onap.datalake.feeder.service.db;
import java.util.ArrayList;
import java.util.List;
@@ -30,10 +30,12 @@ import javax.annotation.PreDestroy;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import com.couchbase.client.java.Bucket;
@@ -55,25 +57,27 @@ import rx.functions.Func1;
*
*/
@Service
-public class CouchbaseService {
+@Scope("prototype")
+public class CouchbaseService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
ApplicationConfiguration config;
-
- @Autowired
- private DbService dbService;
-
+
+ private Db couchbase;
+ //Bucket is thread-safe. https://docs.couchbase.com/java-sdk/current/managing-connections.html
Bucket bucket;
- private boolean isReady = false;
-
+
+ public CouchbaseService(Db db) {
+ couchbase = db;
+ }
+
@PostConstruct
- private void init() {
+ @Override
+ public void init() {
// Initialize Couchbase Connection
try {
- Db couchbase = dbService.getCouchbase();
-
//this tunes the SDK (to customize connection timeout)
CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().connectTimeout(60000) // 60s, default is 5s
.build();
@@ -84,19 +88,27 @@ public class CouchbaseService {
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
log.info("Connected to Couchbase {} as {}", couchbase.getHost(), couchbase.getLogin());
- isReady = true;
+// isReady = true;
} catch (Exception ex) {
log.error("error connection to Couchbase.", ex);
- isReady = false;
+ // isReady = false;
}
}
@PreDestroy
public void cleanUp() {
- bucket.close();
+ config.getShutdownLock().readLock().lock();
+
+ try {
+ log.info("bucket.close() at cleanUp.");
+ bucket.close();
+ } finally {
+ config.getShutdownLock().readLock().unlock();
+ }
}
- public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+ @Override
+ public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) {
List<JsonDocument> documents = new ArrayList<>(jsons.size());
for (JSONObject json : jsons) {
//convert to Couchbase JsonObject from org.json JSONObject
@@ -105,9 +117,9 @@ public class CouchbaseService {
long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson()
//setup TTL
- int expiry = (int) (timestamp / 1000L) + topic.getTtl() * 3600 * 24; //in second
+ int expiry = (int) (timestamp / 1000L) + effectiveTopic.getTopic().getTtl() * 3600 * 24; //in second
- String id = getId(topic, json);
+ String id = getId(effectiveTopic.getTopic(), json);
JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
documents.add(doc);
}
@@ -126,10 +138,10 @@ public class CouchbaseService {
} catch (Exception e) {
log.error("error saving to Couchbase.", e);
}
- log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
+ log.debug("saved text to topic = {}, this batch count = {} ", effectiveTopic, documents.size());
}
- public String getId(TopicConfig topic, JSONObject json) {
+ public String getId(Topic topic, JSONObject json) {
//if this topic requires extract id from JSON
String id = topic.getMessageId(json);
if (id != null) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java
new file mode 100644
index 00000000..c873c010
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java
@@ -0,0 +1,39 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2018 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service.db;
+
+import java.util.List;
+
+import org.json.JSONObject;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+
+/**
+ * Interface for all db store services
+ *
+ * @author Guobiao Mo
+ *
+ */
+public interface DbStoreService {
+
+ void saveJsons(EffectiveTopic topic, List<JSONObject> jsons);
+
+ void init();
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
index 2806e48b..e303fa9b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.datalake.feeder.service;
+package org.onap.datalake.feeder.service.db;
import java.io.IOException;
import java.util.List;
@@ -47,11 +47,13 @@ import org.elasticsearch.rest.RestStatus;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
/**
@@ -61,24 +63,28 @@ import org.springframework.stereotype.Service;
*
*/
@Service
-public class ElasticsearchService {
+@Scope("prototype")
+public class ElasticsearchService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private Db elasticsearch;
@Autowired
private ApplicationConfiguration config;
- @Autowired
- private DbService dbService;
-
- private RestHighLevelClient client;
+ private RestHighLevelClient client;//thread safe
ActionListener<BulkResponse> listener;
-
+
+ public ElasticsearchService(Db db) {
+ elasticsearch = db;
+ }
+
//ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
//Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
@PostConstruct
- private void init() {
- Db elasticsearch = dbService.getElasticsearch();
+ @Override
+ public void init() {
String elasticsearchHost = elasticsearch.getHost();
// Initialize the Connection
@@ -89,7 +95,9 @@ public class ElasticsearchService {
listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
-
+ if(bulkResponse.hasFailures()) {
+ log.debug(bulkResponse.buildFailureMessage());
+ }
}
@Override
@@ -101,7 +109,16 @@ public class ElasticsearchService {
@PreDestroy
public void cleanUp() throws IOException {
- client.close();
+ config.getShutdownLock().readLock().lock();
+
+ try {
+ log.info("cleanUp() closing Elasticsearch client.");
+ client.close();
+ } catch (IOException e) {
+ log.error("client.close() at cleanUp.", e);
+ } finally {
+ config.getShutdownLock().readLock().unlock();
+ }
}
public void ensureTableExist(String topic) throws IOException {
@@ -119,35 +136,41 @@ public class ElasticsearchService {
}
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
- public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+ @Override
+ public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) {
+
BulkRequest request = new BulkRequest();
for (JSONObject json : jsons) {
- if (topic.isCorrelateClearedMessage()) {
- boolean found = correlateClearedMessage(topic, json);
+ if (effectiveTopic.getTopic().isCorrelateClearedMessage()) {
+ boolean found = correlateClearedMessage(effectiveTopic.getTopic(), json);
if (found) {
continue;
}
- }
-
- String id = topic.getMessageId(json); //id can be null
-
- request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
+ }
+
+ String id = effectiveTopic.getTopic().getMessageId(json); //id can be null
+
+ request.add(new IndexRequest(effectiveTopic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
}
- log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size());
+ log.debug("saving text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size());
if (config.isAsync()) {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
} else {
try {
- client.bulk(request, RequestOptions.DEFAULT);
+ BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
+ if(bulkResponse.hasFailures()) {
+ log.debug(bulkResponse.buildFailureMessage());
+ }
} catch (IOException e) {
- log.error(topic.getName(), e);
+ log.error(effectiveTopic.getName(), e);
}
}
+
}
-
+
/**
*
* @param topic
@@ -159,7 +182,7 @@ public class ElasticsearchService {
* source. So use the get API, three parameters: index, type, document
* id
*/
- private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) {
+ private boolean correlateClearedMessage(Topic topic, JSONObject json) {
boolean found = false;
String eName = null;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java
index 135a2c09..1725ee41 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.datalake.feeder.service;
+package org.onap.datalake.feeder.service.db;
import java.io.IOException;
import java.net.InetAddress;
@@ -32,23 +32,23 @@ import java.util.Map;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.onap.datalake.feeder.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import lombok.Getter;
-import lombok.Setter;
/**
* Service to write data to HDFS
@@ -57,24 +57,22 @@ import lombok.Setter;
*
*/
@Service
-public class HdfsService {
+@Scope("prototype")
+public class HdfsService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
- @Autowired
- ApplicationConfiguration config;
+ private Db hdfs;
@Autowired
- private DbService dbService;
+ ApplicationConfiguration config;
FileSystem fileSystem;
- private boolean isReady = false;
private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new);
private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS"));
- @Setter
@Getter
private class Buffer {
long lastFlush;
@@ -93,7 +91,7 @@ public class HdfsService {
lastFlush = System.currentTimeMillis();
}
} catch (IOException e) {
- log.error("error saving to HDFS." + topic, e);
+ log.error("{} error saving to HDFS. {}", topic, e.getMessage());
}
}
@@ -104,12 +102,21 @@ public class HdfsService {
}
}
- public void addData(List<Pair<Long, String>> messages) {
+ /*
+ public void addData(List<Pair<Long, String>> messages) {
+ if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
+ lastFlush = System.currentTimeMillis();
+ }
+
+ messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used
+ }
+ */
+ public void addData2(List<JSONObject> messages) {
if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
lastFlush = System.currentTimeMillis();
}
- messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used
+ messages.stream().forEach(message -> data.add(message.toString()));
}
private void saveMessages(String topic, List<String> bufferList) throws IOException {
@@ -134,20 +141,24 @@ public class HdfsService {
out.writeUTF(message);
out.write('\n');
} catch (IOException e) {
- log.error("error writing to HDFS.", e);
+ log.error("error writing to HDFS. {}", e.getMessage());
}
});
out.close();
+ log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath);
}
}
+ public HdfsService(Db db) {
+ hdfs = db;
+ }
+
@PostConstruct
- private void init() {
+ @Override
+ public void init() {
// Initialize HDFS Connection
try {
- Db hdfs = dbService.getHdfs();
-
//Get configuration of Hadoop system
Configuration hdfsConfig = new Configuration();
@@ -161,45 +172,73 @@ public class HdfsService {
fileSystem = FileSystem.get(hdfsConfig);
- isReady = true;
+ //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data
+ ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get();
+ hadoopShutdownHookManager.clearShutdownHooks();
+
} catch (Exception ex) {
log.error("error connection to HDFS.", ex);
- isReady = false;
}
}
@PreDestroy
public void cleanUp() {
+ config.getShutdownLock().readLock().lock();
+
try {
+ log.info("fileSystem.close() at cleanUp.");
flush();
fileSystem.close();
} catch (IOException e) {
log.error("fileSystem.close() at cleanUp.", e);
+ } finally {
+ config.getShutdownLock().readLock().unlock();
}
}
public void flush() {
+ log.info("Force flush ALL data, regardless of stall");
bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
}
//if no new data comes in for a topic for a while, need to flush its buffer
public void flushStall() {
+ log.debug("Flush stall data");
bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
}
- public void saveMessages(TopicConfig topic, List<Pair<Long, String>> messages) {
+ /*
+ //used if raw data should be saved
+ public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) {
+ String topicStr = topic.getName();
+
+ Map<String, Buffer> bufferMap = bufferLocal.get();
+ final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
+
+ buffer.addData(messages);
+
+ if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
+ buffer.flush(topicStr);
+ } else {
+ log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
+ }
+ }
+ */
+ @Override
+ public void saveJsons(EffectiveTopic topic, List<JSONObject> jsons) {
String topicStr = topic.getName();
Map<String, Buffer> bufferMap = bufferLocal.get();
final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
- buffer.addData(messages);
+ buffer.addData2(jsons);
if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
buffer.flush(topicStr);
} else {
- log.debug("buffer size too small to flush: bufferData.size() {} < config.getHdfsBatchSize() {}", buffer.getData().size(), config.getHdfsBatchSize());
+ log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
}
+
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
index 32d21c62..eb8a3a16 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.datalake.feeder.service;
+package org.onap.datalake.feeder.service.db;
import java.util.ArrayList;
import java.util.HashMap;
@@ -34,11 +34,12 @@ import org.bson.Document;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import com.mongodb.bulk.BulkWriteError;
@@ -47,6 +48,7 @@ import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientOptions.Builder;
import com.mongodb.MongoCredential;
+import com.mongodb.MongoTimeoutException;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
@@ -59,26 +61,30 @@ import com.mongodb.client.model.InsertManyOptions;
*
*/
@Service
-public class MongodbService {
+@Scope("prototype")
+public class MongodbService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private Db mongodb;
@Autowired
private ApplicationConfiguration config;
private boolean dbReady = false;
- @Autowired
- private DbService dbService;
-
private MongoDatabase database;
private MongoClient mongoClient;
+ //MongoCollection is ThreadSafe
private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
private InsertManyOptions insertManyOptions;
+ public MongodbService(Db db) {
+ mongodb = db;
+ }
+
@PostConstruct
- private void init() {
- Db mongodb = dbService.getMongoDB();
-
+ @Override
+ public void init() {
String host = mongodb.getHost();
Integer port = mongodb.getPort();
@@ -103,14 +109,14 @@ public class MongodbService {
builder.sslEnabled(Boolean.TRUE.equals(mongodb.getEncrypt()));// getEncrypt() can be null
}
MongoClientOptions options = builder.build();
- List<ServerAddress> addrs = new ArrayList<ServerAddress>();
+ List<ServerAddress> addrs = new ArrayList<>();
addrs.add(new ServerAddress(host, port)); // FIXME should be a list of address
try {
if (StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) {
credential = MongoCredential.createCredential(userName, databaseName, password.toCharArray());
- List<MongoCredential> credentialList = new ArrayList<MongoCredential>();
+ List<MongoCredential> credentialList = new ArrayList<>();
credentialList.add(credential);
mongoClient = new MongoClient(addrs, credentialList, options);
} else {
@@ -131,25 +137,32 @@ public class MongodbService {
@PreDestroy
public void cleanUp() {
- mongoClient.close();
+ config.getShutdownLock().readLock().lock();
+
+ try {
+ log.info("mongoClient.close() at cleanUp.");
+ mongoClient.close();
+ } finally {
+ config.getShutdownLock().readLock().unlock();
+ }
}
- public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
- if (dbReady == false)//TOD throw exception
+ public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) {
+ if (!dbReady)//TOD throw exception
return;
List<Document> documents = new ArrayList<>(jsons.size());
for (JSONObject json : jsons) {
//convert org.json JSONObject to MongoDB Document
Document doc = Document.parse(json.toString());
- String id = topic.getMessageId(json); //id can be null
+ String id = effectiveTopic.getTopic().getMessageId(json); //id can be null
if (id != null) {
doc.put("_id", id);
}
documents.add(doc);
}
- String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ .
+ String collectionName = effectiveTopic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ .
MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k));
try {
@@ -159,9 +172,11 @@ public class MongodbService {
for (BulkWriteError bulkWriteError : bulkWriteErrors) {
log.error("Failed record: {}", bulkWriteError);
}
+ } catch (MongoTimeoutException e) {
+ log.error("saveJsons()", e);
}
- log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size());
+ log.debug("saved text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size());
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java
index 8a177cc7..51d3168e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java
@@ -28,6 +28,8 @@ import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.Velocity;
import org.apache.velocity.runtime.RuntimeConstants;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.FileWriter;
@@ -59,6 +61,8 @@ import java.util.Map.Entry;
@Getter
public class DruidSupervisorGenerator {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
Template template = null;
VelocityContext context;
@@ -90,7 +94,6 @@ public class DruidSupervisorGenerator {
while (fields.hasNext()) {
Entry<String, JsonNode> field = fields.next();
- // System.out.println("--------"+field.getKey()+"--------");
printNode(prefix + "." + field.getKey(), field.getValue());
}
@@ -113,25 +116,13 @@ public class DruidSupervisorGenerator {
private void printFlattenSpec(JsonNodeType type, String path) {
String name = path.substring(2).replace('.', ':');
// lets see what type the node is
- System.out.println("{");
- System.out.println("\"type\": \"path\",");
- System.out.println("\"name\": \"" + name + "\",");
- System.out.println("\"expr\": \"" + path + "\"");
- System.out.println("},");
+ log.info("{");
+ log.info("\"type\": \"path\",");
+ log.info("\"name\": \"" + name + "\",");
+ log.info("\"expr\": \"" + path + "\"");
+ log.info("},");
dimensions.add(new String[]{name, path});
- /*
- //for dimensionsSpec
- if (JsonNodeType.NUMBER.equals(type)) {
- System.out.println("{");
- System.out.println("\"type\": \"long\",");
- System.out.println("\"name\": \"" + name + "\",");
- System.out.println("},");
- } else {
- System.out.println("\"" + name + "\",");
-
- }
- */
}
public void doTopic(String topic) throws IOException {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/HttpClientUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/HttpClientUtil.java
new file mode 100644
index 00000000..64b643ac
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/HttpClientUtil.java
@@ -0,0 +1,122 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DCAE
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.util;
+
+import com.google.gson.Gson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HttpClient
+ *
+ * @author guochunmeng
+ *
+ */
+public class HttpClientUtil {
+
+ private static final Logger log = LoggerFactory.getLogger(HttpClientUtil.class);
+
+ private static final String KIBANA = "Kibana";
+
+ private static final String KIBANA_DASHBOARD_IMPORT = "KibanaDashboardImport";
+
+ private static final String ELASTICSEARCH_MAPPING_TEMPLATE = "ElasticsearchMappingTemplate";
+
+ private HttpClientUtil() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static boolean sendHttpClientPost(String url, String json, String postFlag, String urlFlag) {
+ boolean flag = false;
+ RestTemplate restTemplate = new RestTemplate();
+ HttpHeaders headers = new HttpHeaders();
+ if (urlFlag.equals(KIBANA)) {
+ log.info("urlFlag is Kibana, add header");
+ headers.add("kbn-xsrf","true");
+ }
+ headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
+ HttpEntity<String> request = new HttpEntity<>(json, headers);
+ ResponseEntity<String> responseEntity = null;
+ try {
+ responseEntity = restTemplate.postForEntity(url, request, String.class);
+ if (responseEntity.getStatusCodeValue() != 200)
+ throw new RestClientException("Resquest failed");
+ Gson gson = new Gson();
+ Map<String, Object> map = new HashMap<>();
+ map = gson.fromJson(responseEntity.getBody(), map.getClass());
+ switch (postFlag) {
+ case KIBANA_DASHBOARD_IMPORT:
+ flag = flagOfKibanaDashboardImport(map);
+ break;
+ case ELASTICSEARCH_MAPPING_TEMPLATE :
+ flag = flagOfPostEsMappingTemplate(map);
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ log.debug("Resquest failed: " + e.getMessage());
+ }
+ return flag;
+ }
+
+ private static boolean flagOfKibanaDashboardImport(Map<String, Object> map) {
+
+ boolean flag = true;
+ List objectsList = (List) map.get("objects");
+
+ if (!objectsList.isEmpty()) {
+ Map<String, Object> map2 = null;
+ for (int i = 0; i < objectsList.size(); i++){
+ map2 = (Map<String, Object>)objectsList.get(i);
+ for(String key : map2.keySet()){
+ if ("error".equals(key)) {
+ return false;
+ }
+ }
+ }
+ }
+ return flag;
+ }
+
+ private static boolean flagOfPostEsMappingTemplate(Map<String, Object> map) {
+
+ boolean flag = true;
+ for(String key : map.keySet()){
+ if ("acknowledged".equals(key) && (boolean) map.get("acknowledged")) {
+ break;
+ } else {
+ flag = false;
+ }
+ }
+ return flag;
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
index db4dcfae..5c77d895 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
@@ -22,7 +22,6 @@ package org.onap.datalake.feeder.util;
import java.util.HashMap;
-import org.apache.commons.collections.CollectionUtils;
import org.json.JSONArray;
import org.json.JSONObject;