diff options
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java')
48 files changed, 2729 insertions, 555 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java b/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java index e7a8e1b9..cde4d43d 100644 --- a/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java +++ b/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java @@ -42,15 +42,7 @@ public class CollectibleDocumentFieldNameValidator implements FieldNameValidator throw new IllegalArgumentException("Field name can not be null");
}
- /* dl change
- if (fieldName.contains(".")) {
- return false;
- }*/
-
- if (fieldName.startsWith("$") && !EXCEPTIONS.contains(fieldName)) {
- return false;
- }
- return true;
+ return !fieldName.startsWith("$") || EXCEPTIONS.contains(fieldName);
}
@Override
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index 73067182..b93924c4 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -20,6 +20,8 @@ package org.onap.datalake.feeder.config; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -41,6 +43,8 @@ import lombok.Setter; @EnableAutoConfiguration public class ApplicationConfiguration { + final ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock(); + //App general private boolean async; private boolean enableSSL; @@ -50,16 +54,7 @@ public class ApplicationConfiguration { private String defaultTopicName; - //DMaaP - private String dmaapZookeeperHostPort; - private String dmaapKafkaHostPort; - private String dmaapKafkaGroup; - private long dmaapKafkaTimeout; - private String[] dmaapKafkaExclude; - - private int dmaapCheckNewTopicInterval; //in millisecond - - private int kafkaConsumerCount; + private long checkTopicInterval; //in millisecond private String elasticsearchType; @@ -70,4 +65,12 @@ public class ApplicationConfiguration { //Version private String datalakeVersion; + + //Kibana + private String kibanaDashboardImportApi; + private Integer kibanaPort; + + //Elasticsearch + private String esTemplateMappingApi; + private Integer esPort; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java index 7e364332..cff29596 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java @@ -24,14 +24,13 @@ import java.util.*; import javax.servlet.http.HttpServletResponse; -import io.swagger.annotations.*; import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.DesignType; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.repository.DbRepository; -import org.onap.datalake.feeder.repository.TopicRepository; -import org.onap.datalake.feeder.service.DbService; -import org.onap.datalake.feeder.controller.domain.DbConfig; +import org.onap.datalake.feeder.dto.DbConfig; import org.onap.datalake.feeder.controller.domain.PostReturnBody; +import org.onap.datalake.feeder.repository.DesignTypeRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -39,15 +38,12 @@ import org.springframework.http.MediaType; import org.springframework.validation.BindingResult; import org.springframework.web.bind.annotation.*; -import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; /** * This controller manages the big data storage settings. All the settings are * saved in database. - * + * * @author Guobiao Mo * */ @@ -64,16 +60,12 @@ public class DbController { private DbRepository dbRepository; @Autowired - private TopicRepository topicRepository; - - @Autowired - private DbService dbService; + private DesignTypeRepository designTypeRepository; - //list all dbs + //list all dbs @GetMapping("") @ResponseBody @ApiOperation(value="Gat all databases name") - //public Iterable<Db> list() throws IOException { public List<String> list() throws IOException { Iterable<Db> ret = dbRepository.findAll(); List<String> retString = new ArrayList<>(); @@ -86,6 +78,21 @@ public class DbController { return retString; } + @GetMapping("/idAndName/{id}") + @ResponseBody + @ApiOperation(value="Get all databases id and name by designTypeId") + public Map<Integer, String> listIdAndName(@PathVariable String id) { + Optional<DesignType> designType = designTypeRepository.findById(id); + Map<Integer, String> map = new HashMap<>(); + if (designType.isPresent()) { + Set<Db> dbs = designType.get().getDbType().getDbs(); + for (Db item : dbs) { + map.put(item.getId(), item.getName()); + } + } + return map; + } + //Create a DB @PostMapping("") @ResponseBody @@ -96,11 +103,11 @@ public class DbController { return null; } - Db oldDb = dbService.getDb(dbConfig.getName()); +/* Db oldDb = dbService.getDb(dbConfig.getName()); if (oldDb != null) { sendError(response, 400, "Db already exists: " + dbConfig.getName()); return null; - } else { + } else {*/ Db newdb = new Db(); newdb.setName(dbConfig.getName()); newdb.setHost(dbConfig.getHost()); @@ -110,7 +117,7 @@ public class DbController { newdb.setPass(dbConfig.getPassword()); newdb.setEncrypt(dbConfig.isEncrypt()); - if(!dbConfig.getName().equals("Elecsticsearch") || !dbConfig.getName().equals("Druid")) + if(!dbConfig.getName().equals("Elecsticsearch") || dbConfig.getName().equals("Druid")) { newdb.setDatabase(new String(dbConfig.getDatabase())); } @@ -122,7 +129,7 @@ public class DbController { retBody.setReturnBody(retMsg); retBody.setStatusCode(200); return retBody; - } + //} } //Show a db @@ -132,10 +139,6 @@ public class DbController { @ResponseBody @ApiOperation(value="Get a database's details.") public Db getDb(@PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException { - /*Db db = dbService.getDb(dbName); - if (db == null) { - sendError(response, 404, "Db not found: " + dbName); - }*/ Db db = dbRepository.findByName(dbName); if (db == null) { sendError(response, 404, "Db not found: " + dbName); @@ -144,51 +147,6 @@ public class DbController { } - //Update Db - @PutMapping("/{dbName}") - @ResponseBody - @ApiOperation(value="Update a database.") - public PostReturnBody<DbConfig> updateDb(@PathVariable("dbName") String dbName, @RequestBody DbConfig dbConfig, BindingResult result, HttpServletResponse response) throws IOException { - - if (result.hasErrors()) { - sendError(response, 400, "Error parsing DB: " + result.toString()); - return null; - } - - if(!dbName.equals(dbConfig.getName())) - { - sendError(response, 400, "Mismatch DB name."); - return null; - } - - Db oldDb = dbService.getDb(dbConfig.getName()); - if (oldDb == null) { - sendError(response, 404, "Db not found: " + dbConfig.getName()); - return null; - } else { - oldDb.setName(dbConfig.getName()); - oldDb.setHost(dbConfig.getHost()); - oldDb.setPort(dbConfig.getPort()); - oldDb.setEnabled(dbConfig.isEnabled()); - oldDb.setLogin(dbConfig.getLogin()); - oldDb.setPass(dbConfig.getPassword()); - oldDb.setEncrypt(dbConfig.isEncrypt()); - - if(!oldDb.getName().equals("Elecsticsearch") || !oldDb.getName().equals("Druid")) - { - oldDb.setDatabase(dbConfig.getDatabase()); - } - dbRepository.save(oldDb); - DbConfig retMsg; - PostReturnBody<DbConfig> retBody = new PostReturnBody<>(); - retMsg = new DbConfig(); - composeRetMessagefromDbConfig(oldDb, retMsg); - retBody.setReturnBody(retMsg); - retBody.setStatusCode(200); - return retBody; - } - } - //Delete a db //the topics are missing in the return, since in we use @JsonBackReference on Db's topics //need to the the following method to retrieve the topic list @@ -214,20 +172,56 @@ public class DbController { @ResponseBody @ApiOperation(value="Get a database's all topics.") public Set<Topic> getDbTopics(@PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException { - //Db db = dbService.getDb(dbName); Set<Topic> topics; try { Db db = dbRepository.findByName(dbName); topics = db.getTopics(); - }catch(Exception ex) - { + } catch(Exception ex) { sendError(response, 404, "DB: " + dbName + " or Topics not found"); - return null; + return Collections.emptySet(); } return topics; } + //Update Db + @PutMapping("") + @ResponseBody + @ApiOperation(value="Update a database.") + public PostReturnBody<DbConfig> updateDb(@RequestBody DbConfig dbConfig, BindingResult result, HttpServletResponse response) throws IOException { + + if (result.hasErrors()) { + sendError(response, 400, "Error parsing DB: " + result.toString()); + return null; + } + + Db oldDb = dbRepository.findById(dbConfig.getId()).get(); + if (oldDb == null) { + sendError(response, 404, "Db not found: " + dbConfig.getName()); + return null; + } else { + oldDb.setHost(dbConfig.getHost()); + oldDb.setPort(dbConfig.getPort()); + oldDb.setEnabled(dbConfig.isEnabled()); + oldDb.setLogin(dbConfig.getLogin()); + oldDb.setPass(dbConfig.getPassword()); + oldDb.setEncrypt(dbConfig.isEncrypt()); + if (!oldDb.getName().equals("Elecsticsearch") || !oldDb.getName().equals("Druid")) { + oldDb.setDatabase(dbConfig.getDatabase()); + } + + dbRepository.save(oldDb); + DbConfig retMsg; + PostReturnBody<DbConfig> retBody = new PostReturnBody<>(); + retMsg = new DbConfig(); + composeRetMessagefromDbConfig(oldDb, retMsg); + retBody.setReturnBody(retMsg); + retBody.setStatusCode(200); + return retBody; + } + + } + @PostMapping("/verify") @ResponseBody diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignController.java new file mode 100644 index 00000000..ebe60502 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignController.java @@ -0,0 +1,168 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.controller; + +import org.onap.datalake.feeder.controller.domain.PostReturnBody; +import org.onap.datalake.feeder.domain.Design; +import org.onap.datalake.feeder.dto.DesignConfig; +import org.onap.datalake.feeder.repository.DesignRepository; +import org.onap.datalake.feeder.service.DesignService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.validation.BindingResult; +import org.springframework.web.bind.annotation.*; + +import io.swagger.annotations.ApiOperation; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import javax.servlet.http.HttpServletResponse; + + +/** + * This controller manages design settings + * + * @author guochunmeng + */ +@RestController +@RequestMapping(value = "/designs", produces = MediaType.APPLICATION_JSON_VALUE) +public class DesignController { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private DesignRepository designRepository; + + @Autowired + private DesignService designService; + + @PostMapping("") + @ResponseBody + @ApiOperation(value="Create a design.") + public PostReturnBody<DesignConfig> createDesign(@RequestBody DesignConfig designConfig, BindingResult result, HttpServletResponse response) throws IOException { + + if (result.hasErrors()) { + sendError(response, 400, "Error parsing DesignConfig: "+result.toString()); + return null; + } + + Design design = null; + try { + design = designService.fillDesignConfiguration(designConfig); + } catch (Exception e) { + log.debug("FillDesignConfiguration failed", e.getMessage()); + sendError(response, 400, "Error FillDesignConfiguration: "+e.getMessage()); + return null; + } + designRepository.save(design); + log.info("Design save successed"); + return mkPostReturnBody(200, design); + } + + + @PutMapping("{id}") + @ResponseBody + @ApiOperation(value="Update a design.") + public PostReturnBody<DesignConfig> updateDesign(@RequestBody DesignConfig designConfig, BindingResult result, @PathVariable Integer id, HttpServletResponse response) throws IOException { + + if (result.hasErrors()) { + sendError(response, 400, "Error parsing DesignConfig: "+result.toString()); + return null; + } + + Design design = designService.getDesign(id); + if (design != null) { + try { + designService.fillDesignConfiguration(designConfig, design); + } catch (Exception e) { + log.debug("FillDesignConfiguration failed", e.getMessage()); + sendError(response, 400, "Error FillDesignConfiguration: "+e.getMessage()); + return null; + } + designRepository.save(design); + log.info("Design update successed"); + return mkPostReturnBody(200, design); + } else { + sendError(response, 400, "Design not found: "+id); + return null; + } + + } + + + @DeleteMapping("/{id}") + @ResponseBody + @ApiOperation(value="delete a design.") + public void deleteDesign(@PathVariable("id") Integer id, HttpServletResponse response) throws IOException{ + + Design oldDesign = designService.getDesign(id); + if (oldDesign == null) { + sendError(response, 400, "design not found "+id); + } else { + designRepository.delete(oldDesign); + response.setStatus(204); + } + } + + + @GetMapping("") + @ResponseBody + @ApiOperation(value="List all Designs") + public List<DesignConfig> queryAllDesign(){ + return designService.queryAllDesign(); + } + + + @PostMapping("/deploy/{id}") + @ResponseBody + @ApiOperation(value="Design deploy") + public Map<Integer, Boolean> deployDesign(@PathVariable Integer id, HttpServletResponse response) throws IOException { + + Optional<Design> designOptional = designRepository.findById(id); + if (designOptional.isPresent()) { + Design design = designOptional.get(); + return designService.deploy(design); + } else { + sendError(response, 400, "Design is null"); + return new HashMap<>(); + } + } + + + private PostReturnBody<DesignConfig> mkPostReturnBody(int statusCode, Design design) { + PostReturnBody<DesignConfig> retBody = new PostReturnBody<>(); + retBody.setStatusCode(statusCode); + retBody.setReturnBody(design.getDesignConfig()); + return retBody; + } + + private void sendError(HttpServletResponse response, int sc, String msg) throws IOException { + log.info(msg); + response.sendError(sc, msg); + } + +}
\ No newline at end of file diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignTypeController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignTypeController.java new file mode 100755 index 00000000..35d206bb --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DesignTypeController.java @@ -0,0 +1,54 @@ +/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.controller;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onap.datalake.feeder.domain.DesignType;
+import org.onap.datalake.feeder.dto.DesignTypeConfig;
+import org.onap.datalake.feeder.service.DesignTypeService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.*;
+
+import io.swagger.annotations.ApiOperation;
+
+/**
+ * This controller manages designType settings
+ *
+ * @author guochunmeng
+ */
+@RestController
+@RequestMapping(value = "/designTypes", produces = { MediaType.APPLICATION_JSON_VALUE })
+public class DesignTypeController {
+
+ @Autowired
+ private DesignTypeService designTypeService;
+
+ @GetMapping("")
+ @ResponseBody
+ @ApiOperation(value="List all designTypes")
+ public List<DesignTypeConfig> getDesignType() {
+ return designTypeService.getDesignTypes();
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java index 6a44c4f2..d9080ec0 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java @@ -58,9 +58,12 @@ public class FeederController { @ResponseBody @ApiOperation(value="Start pulling data.") public String start() throws IOException { - log.info("DataLake feeder starting to pull data from DMaaP..."); + log.info("Going to start DataLake feeder ..."); if(pullService.isRunning() == false) { pullService.start(); + log.info("DataLake feeder started."); + }else { + log.info("DataLake feeder already started."); } return "{\"running\": true}"; } @@ -72,11 +75,14 @@ public class FeederController { @ResponseBody @ApiOperation(value="Stop pulling data.") public String stop() { + log.info("Going to stop DataLake feeder ..."); if(pullService.isRunning() == true) { pullService.shutdown(); + log.info("DataLake feeder is stopped."); + }else { + log.info("DataLake feeder already stopped."); } - log.info("DataLake feeder is stopped."); return "{\"running\": false}"; } /** @@ -86,7 +92,7 @@ public class FeederController { @ApiOperation(value="Retrieve feeder status.") public String status() { String status = "Feeder is running: "+pullService.isRunning(); - log.info("sending feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc. + log.info("sending feeder status ..." + status);//TODO we can send what topics are monitored, how many messages are sent, etc. return "{\"version\": \""+config.getDatalakeVersion()+"\", \"running\": "+pullService.isRunning()+"}"; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java new file mode 100644 index 00000000..8d1bf316 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java @@ -0,0 +1,149 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.controller; + +import io.swagger.annotations.ApiOperation; +import org.onap.datalake.feeder.controller.domain.PostReturnBody; +import org.onap.datalake.feeder.domain.Kafka; +import org.onap.datalake.feeder.dto.KafkaConfig; +import org.onap.datalake.feeder.repository.KafkaRepository; +import org.onap.datalake.feeder.service.KafkaService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.validation.BindingResult; +import org.springframework.web.bind.annotation.*; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.List; + +/** + * This controller manages kafka settings + * + * @author guochunmeng + */ +@RestController +@RequestMapping(value = "/kafkas", produces = { MediaType.APPLICATION_JSON_VALUE }) +public class KafkaController { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private KafkaService kafkaService; + + @Autowired + private KafkaRepository kafkaRepository; + + @PostMapping("") + @ResponseBody + @ApiOperation(value="Create a kafka.") + public PostReturnBody<KafkaConfig> createKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, HttpServletResponse response) throws IOException { + + if (result.hasErrors()) { + sendError(response, 400, "Error parsing KafkaConfig : "+result.toString()); + return null; + } + + Kafka oldKafka = kafkaService.getKafkaById(kafkaConfig.getId()); + + if (oldKafka != null) { + sendError(response, 400, "kafka is exist "+kafkaConfig.getId()); + return null; + } else { + Kafka kafka = null; + try { + kafka = kafkaService.fillKafkaConfiguration(kafkaConfig); + } catch (Exception e) { + log.debug("FillKafkaConfiguration failed", e.getMessage()); + sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage()); + return null; + } + kafkaRepository.save(kafka); + log.info("Kafka save successed"); + return mkPostReturnBody(200, kafka); + } + } + + @PutMapping("/{id}") + @ResponseBody + @ApiOperation(value="Update a kafka.") + public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable int id, HttpServletResponse response) throws IOException { + + if (result.hasErrors()) { + sendError(response, 400, "Error parsing KafkaConfig : "+result.toString()); + return null; + } + + Kafka oldKafka = kafkaService.getKafkaById(id); + + if (oldKafka == null) { + sendError(response, 400, "Kafka not found: "+id); + return null; + } else { + try { + kafkaService.fillKafkaConfiguration(kafkaConfig, oldKafka); + } catch (Exception e) { + log.debug("FillKafkaConfiguration failed", e.getMessage()); + sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage()); + return null; + } + kafkaRepository.save(oldKafka); + log.info("kafka update successed"); + return mkPostReturnBody(200, oldKafka); + } + } + + @DeleteMapping("/{id}") + @ResponseBody + @ApiOperation(value="delete a kafka.") + public void deleteKafka(@PathVariable("id") int id, HttpServletResponse response) throws IOException{ + + Kafka oldKafka = kafkaService.getKafkaById(id); + if (oldKafka == null) { + sendError(response, 400, "kafka not found "+id); + } else { + kafkaRepository.delete(oldKafka); + response.setStatus(204); + } + } + + @GetMapping("") + @ResponseBody + @ApiOperation(value="List all Kafkas") + public List<KafkaConfig> queryAllKafka(){ + return kafkaService.getAllKafka(); + } + + private PostReturnBody<KafkaConfig> mkPostReturnBody(int statusCode, Kafka kafka) { + PostReturnBody<KafkaConfig> retBody = new PostReturnBody<>(); + retBody.setStatusCode(statusCode); + retBody.setReturnBody(kafka.getKafkaConfig()); + return retBody; + } + + private void sendError(HttpServletResponse response, int sc, String msg) throws IOException { + log.info(msg); + response.sendError(sc, msg); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java index 88f573a1..b59b2a7b 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -27,17 +27,18 @@ import java.util.Set; import javax.servlet.http.HttpServletResponse; import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.controller.domain.PostReturnBody; import org.onap.datalake.feeder.dto.TopicConfig; -import org.onap.datalake.feeder.repository.DbRepository; +import org.onap.datalake.feeder.repository.KafkaRepository; import org.onap.datalake.feeder.repository.TopicRepository; -import org.onap.datalake.feeder.service.DbService; import org.onap.datalake.feeder.service.DmaapService; import org.onap.datalake.feeder.service.TopicService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.http.MediaType; import org.springframework.validation.BindingResult; import org.springframework.web.bind.annotation.DeleteMapping; @@ -50,6 +51,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; + import io.swagger.annotations.ApiOperation; /** @@ -71,18 +73,23 @@ public class TopicController { private final Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired - private DmaapService dmaapService; + private ApplicationContext context; @Autowired + private KafkaRepository kafkaRepository; + + @Autowired private TopicRepository topicRepository; @Autowired private TopicService topicService; - @GetMapping("/dmaap") + @GetMapping("/dmaap/{kafkaId}") @ResponseBody @ApiOperation(value = "List all topic names in DMaaP.") - public List<String> listDmaapTopics() { + public List<String> listDmaapTopics(@PathVariable("kafkaId") int kafkaId ) { + Kafka kafka = kafkaRepository.findById(kafkaId).get(); + DmaapService dmaapService = context.getBean(DmaapService.class, kafka); return dmaapService.getTopics(); } @@ -94,7 +101,7 @@ public class TopicController { List<String> retString = new ArrayList<>(); for(Topic item : ret) { - if(!topicService.istDefaultTopic(item)) + if(!topicService.isDefaultTopic(item)) retString.add(item.getName()); } return retString; @@ -109,24 +116,25 @@ public class TopicController { sendError(response, 400, "Error parsing Topic: "+result.toString()); return null; } - Topic oldTopic = topicService.getTopic(topicConfig.getName()); + /*Topic oldTopic = topicService.getTopic(topicConfig.getName()); if (oldTopic != null) { sendError(response, 400, "Topic already exists "+topicConfig.getName()); return null; - } else { + } else {*/ Topic wTopic = topicService.fillTopicConfiguration(topicConfig); if(wTopic.getTtl() == 0) wTopic.setTtl(3650); topicRepository.save(wTopic); return mkPostReturnBody(200, wTopic); - } + //} + //FIXME need to connect to Kafka } - @GetMapping("/{topicName}") + @GetMapping("/{topicId}") @ResponseBody @ApiOperation(value="Get a topic's settings.") - public TopicConfig getTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException { - Topic topic = topicService.getTopic(topicName); + public TopicConfig getTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException { + Topic topic = topicService.getTopic(topicId); if(topic == null) { sendError(response, 404, "Topic not found"); return null; @@ -136,23 +144,23 @@ public class TopicController { //This is not a partial update: old topic is wiped out, and new topic is created based on the input json. //One exception is that old DBs are kept - @PutMapping("/{topicName}") + @PutMapping("/{topicId}") @ResponseBody @ApiOperation(value="Update a topic.") - public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicName") String topicName, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException { + public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicId") int topicId, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException { if (result.hasErrors()) { sendError(response, 400, "Error parsing Topic: "+result.toString()); return null; } - if(!topicName.equals(topicConfig.getName())) + if(topicId!=topicConfig.getId()) { - sendError(response, 400, "Topic name mismatch" + topicName + topicConfig.getName()); + sendError(response, 400, "Topic name mismatch" + topicId + topicConfig); return null; } - Topic oldTopic = topicService.getTopic(topicConfig.getName()); + Topic oldTopic = topicService.getTopic(topicId); if (oldTopic == null) { sendError(response, 404, "Topic not found "+topicConfig.getName()); return null; @@ -163,14 +171,14 @@ public class TopicController { } } - @DeleteMapping("/{topicName}") + @DeleteMapping("/{topicId}") @ResponseBody - @ApiOperation(value="Update a topic.") - public void deleteTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException + @ApiOperation(value="Delete a topic.") + public void deleteTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException { - Topic oldTopic = topicService.getTopic(topicName); + Topic oldTopic = topicService.getTopic(topicId); if (oldTopic == null) { - sendError(response, 404, "Topic not found "+topicName); + sendError(response, 404, "Topic not found "+topicId); } else { Set<Db> dbRelation = oldTopic.getDbs(); dbRelation.clear(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java index da1f6cab..cfd2462b 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java @@ -24,11 +24,17 @@ import java.util.Set; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.FetchType; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.JoinTable; import javax.persistence.ManyToMany; +import javax.persistence.ManyToOne; import javax.persistence.Table; + +import org.onap.datalake.feeder.enumeration.DbTypeEnum; + import com.fasterxml.jackson.annotation.JsonBackReference; import lombok.Getter; import lombok.Setter; @@ -46,10 +52,14 @@ import lombok.Setter; @Table(name = "db") public class Db { @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + @Column(name = "`id`") + private int id; + @Column(name="`name`") private String name; - @Column(name="`enabled`") + @Column(name="`enabled`", nullable = false) private boolean enabled; @Column(name="`host`") @@ -79,19 +89,49 @@ public class Db { @Column(name="`property3`") private String property3; + @ManyToOne(fetch = FetchType.EAGER) + @JoinColumn(name = "db_type_id", nullable = false) + private DbType dbType; + @JsonBackReference @ManyToMany(fetch = FetchType.EAGER) @JoinTable( name = "map_db_topic", - joinColumns = { @JoinColumn(name="db_name") }, - inverseJoinColumns = { @JoinColumn(name="topic_name") } + joinColumns = { @JoinColumn(name="db_id") }, + inverseJoinColumns = { @JoinColumn(name="topic_id") } ) - protected Set<Topic> topics; + private Set<Topic> topics; + + public boolean isTool() { + return dbType.isTool(); + } + + public boolean isHdfs() { + return isDb(DbTypeEnum.HDFS); + } + + public boolean isElasticsearch() { + return isDb(DbTypeEnum.ES); + } + + public boolean isCouchbase() { + return isDb(DbTypeEnum.CB); + } - public Db() { + public boolean isDruid() { + return isDb(DbTypeEnum.DRUID); } - public Db(String name) { - this.name = name; + public boolean isMongoDB() { + return isDb(DbTypeEnum.MONGO); + } + + private boolean isDb(DbTypeEnum dbTypeEnum) { + return dbTypeEnum.equals(DbTypeEnum.valueOf(dbType.getId())); + } + + @Override + public String toString() { + return String.format("Db %s (name=%s, enabled=%s)", id, name, enabled); } @Override @@ -102,11 +142,11 @@ public class Db { if (this.getClass() != obj.getClass()) return false; - return name.equals(((Db) obj).getName()); + return id==((Db) obj).getId(); } @Override public int hashCode() { - return name.hashCode(); + return id; } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java new file mode 100644 index 00000000..9c83a9cd --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java @@ -0,0 +1,92 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +import java.util.HashSet; +import java.util.Set; + +import javax.persistence.CascadeType; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.FetchType; +import javax.persistence.Id; +import javax.persistence.OneToMany; +import javax.persistence.Table; +import lombok.Getter; +import lombok.Setter; + + +/** + * Domain class representing bid data storage type + * + * @author Guobiao Mo + * + */ +@Setter +@Getter +@Entity +@Table(name = "db_type") +public class DbType { + @Id + @Column(name="`id`") + private String id; + + @Column(name="`name`", nullable = false) + private String name; + + @Column(name="`default_port`") + private Integer defaultPort; + + @Column(name="`tool`", nullable = false) + private boolean tool; + + @OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY, mappedBy = "dbType") + protected Set<Db> dbs = new HashSet<>(); + + public DbType() { + } + + public DbType(String id, String name) { + this.id = id; + this.name = name; + } + + @Override + public String toString() { + return String.format("DbType %s (name=%s)", id, name); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) + return false; + + if (this.getClass() != obj.getClass()) + return false; + + return id.equals(((DbType) obj).getId()); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Design.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Design.java new file mode 100644 index 00000000..faf3755e --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Design.java @@ -0,0 +1,104 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.domain; + +import com.fasterxml.jackson.annotation.JsonBackReference; +import lombok.Getter; +import lombok.Setter; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import javax.persistence.*; + +import org.onap.datalake.feeder.dto.DesignConfig; + +/** + * Domain class representing design + * + * @author guochunmeng + */ + +@Getter +@Setter +@Entity +@Table(name = "design") +public class Design { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + @Column(name = "`id`") + private Integer id; + + @Column(name = "`name`") + private String name; + + @ManyToOne(fetch = FetchType.EAGER) + @JoinColumn(name = "topic_name_id", nullable = false) + private TopicName topicName;//topic name + + @Column(name = "`submitted`") + private Boolean submitted; + + @Column(name = "`body`") + private String body; + + @Column(name = "`note`") + private String note; + + @ManyToOne(fetch=FetchType.EAGER) + @JoinColumn(name = "design_type_id", nullable = false) + @JsonBackReference + private DesignType designType; + + //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL) + @JsonBackReference + //@JsonManagedReference + @ManyToMany(fetch = FetchType.EAGER) + @JoinTable(name = "map_db_design", joinColumns = { @JoinColumn(name = "design_id") }, inverseJoinColumns = { @JoinColumn(name = "db_id") }) + protected Set<Db> dbs; + + public DesignConfig getDesignConfig() { + + DesignConfig designConfig = new DesignConfig(); + + designConfig.setId(getId()); + designConfig.setBody(getBody()); + designConfig.setName(getName()); + designConfig.setNote(getNote()); + designConfig.setSubmitted(getSubmitted()); + designConfig.setTopicName(getTopicName().getId()); + designConfig.setDesignType(getDesignType().getId()); + designConfig.setDesignTypeName(getDesignType().getName()); + + Set<Db> designDb = getDbs(); + List<Integer> dbList = new ArrayList<>(); + if (designDb != null) { + for (Db item : designDb) { + dbList.add(item.getId()); + } + } + designConfig.setDbs(dbList); + + return designConfig; + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DesignType.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DesignType.java new file mode 100644 index 00000000..14026fe0 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DesignType.java @@ -0,0 +1,73 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.domain; + + +import com.fasterxml.jackson.annotation.JsonBackReference; +import lombok.Getter; +import lombok.Setter; +import org.onap.datalake.feeder.dto.DesignTypeConfig; + +import java.util.HashSet; +import java.util.Set; + +import javax.persistence.*; + +/** + * Domain class representing design_type + * + * @author guochunmeng + */ +@Getter +@Setter +@Entity +@Table(name = "design_type") +public class DesignType { + + @Id + @Column(name = "`id`") + private String id; + + @Column(name = "`name`") + private String name; + + @ManyToOne(fetch=FetchType.LAZY) + @JoinColumn(name="db_type_id", nullable = false) + @JsonBackReference + private DbType dbType; + + @OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY, mappedBy = "designType") + protected Set<Design> designs = new HashSet<>(); + + @Column(name = "`note`") + private String note; + + public DesignTypeConfig getDesignTypeConfig() { + + DesignTypeConfig designTypeConfig = new DesignTypeConfig(); + + designTypeConfig.setId(getId()); + designTypeConfig.setName(getName()); + + return designTypeConfig; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java new file mode 100644 index 00000000..df7aad04 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java @@ -0,0 +1,64 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +/** + * A warper of parent Topic + * + * @author Guobiao Mo + * + */ + +public class EffectiveTopic { + private Topic topic; //base Topic + + String name; + + public EffectiveTopic(Topic baseTopic) { + topic = baseTopic; + } + + public EffectiveTopic(Topic baseTopic, String name ) { + topic = baseTopic; + this.name = name; + } + + public String getName() { + return name==null?topic.getName():name; + } + + public void setName(String name) { + this.name = name; + } + + public Topic getTopic() { + return topic; + } + + public void setTopic(Topic topic) { + this.topic = topic; + } + + @Override + public String toString() { + return String.format("EffectiveTopic %s (base Topic=%s)", getName(), topic.toString()); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java new file mode 100644 index 00000000..de80db70 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java @@ -0,0 +1,147 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +import java.util.Set; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.FetchType; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.JoinColumn; +import javax.persistence.JoinTable; +import javax.persistence.ManyToMany; +import javax.persistence.Table; +import com.fasterxml.jackson.annotation.JsonBackReference; +import lombok.Getter; +import lombok.Setter; +import org.onap.datalake.feeder.dto.KafkaConfig; + + +/** + * Domain class representing Kafka cluster + * + * @author Guobiao Mo + * + */ +@Setter +@Getter +@Entity +@Table(name = "kafka") +public class Kafka { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + @Column(name = "`id`") + private int id; + + @Column(name="`name`", nullable = false) + private String name; + + @Column(name="`enabled`", nullable = false) + private boolean enabled; + + @Column(name="broker_list", nullable = false) + private String brokerList;//message-router-kafka:9092,message-router-kafka2:9092 + + @Column(name="`zk`", nullable = false) + private String zooKeeper;//message-router-zookeeper:2181 + + @Column(name="`group`", columnDefinition = "varchar(255) DEFAULT 'datalake'") + private String group; + + @Column(name="`secure`", columnDefinition = " bit(1) DEFAULT 0") + private boolean secure; + + @Column(name="`login`") + private String login; + + @Column(name="`pass`") + private String pass; + + @Column(name="`security_protocol`") + private String securityProtocol; + + //by default, all topics started with '__' are excluded, here one can explicitly include them + //example: '__consumer_offsets,__transaction_state' + @Column(name="`included_topic`") + private String includedTopic; + + @Column(name="`excluded_topic`", columnDefinition = "varchar(1023) default '__consumer_offsets,__transaction_state'") + private String excludedTopic; + + @Column(name="`consumer_count`", columnDefinition = "integer default 3") + private Integer consumerCount; + + //don't show this field in admin UI + @Column(name="`timeout_sec`", columnDefinition = "integer default 10") + private Integer timeout; + + @JsonBackReference + @ManyToMany(fetch = FetchType.EAGER) + @JoinTable( name = "map_kafka_topic", + joinColumns = { @JoinColumn(name="kafka_id") }, + inverseJoinColumns = { @JoinColumn(name="topic_id") } + ) + private Set<Topic> topics; + + @Override + public String toString() { + return String.format("Kafka %s (name=%s, enabled=%s)", id, name, enabled); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) + return false; + + if (this.getClass() != obj.getClass()) + return false; + + return id == ((Kafka) obj).getId(); + } + + @Override + public int hashCode() { + return id; + } + + public KafkaConfig getKafkaConfig() { + KafkaConfig kafkaConfig = new KafkaConfig(); + + kafkaConfig.setId(getId()); + kafkaConfig.setBrokerList(getBrokerList()); + kafkaConfig.setConsumerCount(getConsumerCount()); + kafkaConfig.setEnabled(isEnabled()); + kafkaConfig.setExcludedTopic(getExcludedTopic()); + kafkaConfig.setGroup(getGroup()); + kafkaConfig.setIncludedTopic(getIncludedTopic()); + kafkaConfig.setLogin(getLogin()); + kafkaConfig.setName(getName()); + kafkaConfig.setPass(getPass()); + kafkaConfig.setSecure(isSecure()); + kafkaConfig.setSecurityProtocol(getSecurityProtocol()); + kafkaConfig.setTimeout(getTimeout()); + kafkaConfig.setZooKeeper(getZooKeeper()); + + return kafkaConfig; + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index acb48aef..5d0c7625 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -20,6 +20,7 @@ package org.onap.datalake.feeder.domain; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -30,9 +31,13 @@ import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.JoinTable; import javax.persistence.ManyToMany; +import javax.persistence.ManyToOne; import javax.persistence.Table; +import org.apache.commons.lang3.StringUtils; +import org.json.JSONObject; import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.enumeration.DataFormat; import com.fasterxml.jackson.annotation.JsonBackReference; @@ -51,9 +56,13 @@ import lombok.Setter; @Table(name = "topic") public class Topic { @Id - @Column(name = "`name`") - private String name;//topic name + @Column(name = "`id`") + private Integer id; + @ManyToOne(fetch = FetchType.EAGER) + @JoinColumn(name = "topic_name_id", nullable = false) + private TopicName topicName;//topic name + //for protected Kafka topics @Column(name = "`login`") private String login; @@ -65,65 +74,59 @@ public class Topic { @JsonBackReference //@JsonManagedReference @ManyToMany(fetch = FetchType.EAGER) - @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_name") }, inverseJoinColumns = { @JoinColumn(name = "db_name") }) - protected Set<Db> dbs; + @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "db_id") }) + protected Set<Db> dbs=new HashSet<>(); + + @ManyToMany(fetch = FetchType.EAGER) + @JoinTable(name = "map_kafka_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "kafka_id") }) + protected Set<Kafka> kafkas=new HashSet<>(); /** * indicate if we should monitor this topic */ - @Column(name = "`enabled`") - private Boolean enabled; + @Column(name = "`enabled`", nullable = false) + private boolean enabled; /** * save raw message text */ - @Column(name = "`save_raw`") - private Boolean saveRaw; + @Column(name = "`save_raw`", nullable = false, columnDefinition = " bit(1) DEFAULT 0") + private boolean saveRaw; /** * need to explicitly tell feeder the data format of the message. support JSON, * XML, YAML, TEXT */ @Column(name = "`data_format`") - private String dataFormat; + protected String dataFormat; /** * TTL in day */ + @Column(name = "`ttl_day`") private Integer ttl; //if this flag is true, need to correlate alarm cleared message to previous alarm - @Column(name = "`correlate_cleared_message`") - private Boolean correlateClearedMessage; + @Column(name = "`correlate_cleared_message`", nullable = false, columnDefinition = " bit(1) DEFAULT 0") + private boolean correlateClearedMessage; //paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name" @Column(name = "`message_id_path`") - private String messageIdPath; + protected String messageIdPath; //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray" - @Column(name = "`aggregate_array_path`") - private String aggregateArrayPath; + @Column(name = "`aggregate_array_path`") + protected String aggregateArrayPath; //paths to the element in array that need flatten, this element is used as label, comma separated, //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..." - @Column(name = "`flatten_array_path`") - private String flattenArrayPath; + @Column(name = "`flatten_array_path`") + protected String flattenArrayPath; - public Topic() { - } - - public Topic(String name) { - this.name = name; - } - - public boolean isEnabled() { - return is(enabled); + public String getName() { + return topicName.getId(); } - - public boolean isCorrelateClearedMessage() { - return is(correlateClearedMessage); - } - + public int getTtl() { if (ttl != null) { return ttl; @@ -132,25 +135,58 @@ public class Topic { } } - private boolean is(Boolean b) { - return is(b, false); + public DataFormat getDataFormat2() { + if (dataFormat != null) { + return DataFormat.fromString(dataFormat); + } else { + return null; + } } - private boolean is(Boolean b, boolean defaultValue) { - if (b != null) { - return b; - } else { - return defaultValue; + public String[] getAggregateArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(aggregateArrayPath)) { + ret = aggregateArrayPath.split(","); } + + return ret; + } + + public String[] getFlattenArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(flattenArrayPath)) { + ret = flattenArrayPath.split(","); + } + + return ret; } - public boolean isSaveRaw() { - return is(saveRaw); + //extract DB id from JSON attributes, support multiple attributes + public String getMessageId(JSONObject json) { + String ret = null; + + if (StringUtils.isNotBlank(messageIdPath)) { + String[] paths = messageIdPath.split(","); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < paths.length; i++) { + if (i > 0) { + sb.append('^'); + } + sb.append(json.query(paths[i]).toString()); + } + ret = sb.toString(); + } + + return ret; } public TopicConfig getTopicConfig() { TopicConfig tConfig = new TopicConfig(); + tConfig.setId(getId()); tConfig.setName(getName()); tConfig.setLogin(getLogin()); tConfig.setEnabled(isEnabled()); @@ -161,21 +197,35 @@ public class Topic { tConfig.setAggregateArrayPath(getAggregateArrayPath()); tConfig.setFlattenArrayPath(getFlattenArrayPath()); tConfig.setTtl(getTtl()); + Set<Db> topicDb = getDbs(); List<String> dbList = new ArrayList<>(); + List<String> enabledDbList = new ArrayList<>(); if (topicDb != null) { for (Db item : topicDb) { dbList.add(item.getName()); + if(item.isEnabled()) { + enabledDbList.add(item.getName()); + } } } tConfig.setSinkdbs(dbList); + tConfig.setEnabledSinkdbs(enabledDbList); + Set<Kafka> topicKafka = getKafkas(); + List<Integer> kafkaList = new ArrayList<>(); + if (topicKafka != null) { + for (Kafka kafka : topicKafka) { + kafkaList.add(kafka.getId()); + } + } + tConfig.setKafkas(kafkaList); return tConfig; } @Override public String toString() { - return name; + return String.format("Topic %s (enabled=%s, dbs=%s, kafkas=%s)", topicName, enabled, dbs, kafkas); } @Override @@ -186,12 +236,12 @@ public class Topic { if (this.getClass() != obj.getClass()) return false; - return name.equals(((Topic) obj).getName()); + return id.equals(((Topic) obj).getId()); } @Override public int hashCode() { - return name.hashCode(); + return id; } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/TopicName.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/TopicName.java new file mode 100644 index 00000000..83227ada --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/TopicName.java @@ -0,0 +1,86 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + + +import java.util.Set; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.FetchType; +import javax.persistence.Id; +import javax.persistence.OneToMany; +import javax.persistence.Table; + +import lombok.Getter; +import lombok.Setter; + +/** + * Domain class representing unique topic names + * + * @author Guobiao Mo + * + */ +@Setter +@Getter +@Entity +@Table(name = "topic_name") +public class TopicName { + @Id + @Column(name = "`id`") + private String id;//topic name + + + @OneToMany(fetch = FetchType.LAZY, mappedBy = "topicName") + protected Set<Design> designs; + + + @OneToMany(fetch = FetchType.LAZY, mappedBy = "topicName") + protected Set<Topic> topics; + + public TopicName() { + } + + public TopicName(String name) { + id = name; + } + + @Override + public String toString() { + return "TopicName "+ id; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) + return false; + + if (this.getClass() != obj.getClass()) + return false; + + return id.equals(((TopicName) obj).getId()); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/DbConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java index 557b545c..eff87114 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/DbConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.datalake.feeder.controller.domain; +package org.onap.datalake.feeder.dto; import lombok.Getter; @@ -33,6 +33,7 @@ import lombok.Setter; @Getter @Setter public class DbConfig { + private int id; private String name; private String host; private boolean enabled; @@ -40,6 +41,6 @@ public class DbConfig { private String password; private boolean encrypt; private String database; - private int port; + private Integer port; private String poperties; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignConfig.java new file mode 100755 index 00000000..34256004 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignConfig.java @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.dto; + +import lombok.Getter; +import lombok.Setter; + +import java.util.List; + +/** + * JSON request body for portalDesign Config. + * + * @author guochunmeng + */ + +@Getter +@Setter +public class DesignConfig { + + private Integer id; + private String name; + private Boolean submitted; + private String body; + private String note; + private String topicName; + private String designType; + private String designTypeName;//UI show name + private List<Integer> dbs; + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignTypeConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignTypeConfig.java new file mode 100644 index 00000000..ddedf38b --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DesignTypeConfig.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 QCT + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.dto; + +import lombok.Getter; +import lombok.Setter; + +/** + * JSON request body for DesignType Config. + * + * @author guochunmeng + * + */ +@Setter +@Getter +public class DesignTypeConfig { + + private String id; + private String name; + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java new file mode 100644 index 00000000..f5e9539c --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 QCT + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.dto; + +import lombok.Getter; +import lombok.Setter; + +/** + * JSON request body for Kafka Config. + * + * @author guochunmeng + * + */ +@Getter +@Setter +public class KafkaConfig { + + private int id; + + private String name; + + private boolean enabled; + + private String brokerList; + + private String zooKeeper; + + private String group; + + private boolean secure; + + private String login; + + private String pass; + + private String securityProtocol; + + private String includedTopic; + + private String excludedTopic; + + private Integer consumerCount; + + private Integer timeout; + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index 8dfe1b16..6a262ca8 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -25,10 +25,6 @@ import lombok.Setter; import java.util.List; -import org.apache.commons.lang3.StringUtils; -import org.json.JSONObject; -import org.onap.datalake.feeder.enumeration.DataFormat; - /** * JSON request body for Topic manipulation. * @@ -41,10 +37,12 @@ import org.onap.datalake.feeder.enumeration.DataFormat; public class TopicConfig { + private int id; private String name; private String login; private String password; private List<String> sinkdbs; + private List<String> enabledSinkdbs;//only include enabled db private boolean enabled; private boolean saveRaw; private String dataFormat; @@ -53,82 +51,11 @@ public class TopicConfig { private String messageIdPath; private String aggregateArrayPath; private String flattenArrayPath; - - public DataFormat getDataFormat2() { - if (dataFormat != null) { - return DataFormat.fromString(dataFormat); - } else { - return null; - } - } - - public boolean supportHdfs() { - return containDb("HDFS"); - } - - public boolean supportElasticsearch() { - return containDb("Elasticsearch");//TODO string hard codes - } - - public boolean supportCouchbase() { - return containDb("Couchbase"); - } - - public boolean supportDruid() { - return containDb("Druid"); - } - - public boolean supportMongoDB() { - return containDb("MongoDB"); - } - - private boolean containDb(String dbName) { - return (sinkdbs != null && sinkdbs.contains(dbName)); - } - - //extract DB id from JSON attributes, support multiple attributes - public String getMessageId(JSONObject json) { - String id = null; - - if (StringUtils.isNotBlank(messageIdPath)) { - String[] paths = messageIdPath.split(","); - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < paths.length; i++) { - if (i > 0) { - sb.append('^'); - } - sb.append(json.query(paths[i]).toString()); - } - id = sb.toString(); - } - - return id; - } - - public String[] getAggregateArrayPath2() { - String[] ret = null; - - if (StringUtils.isNotBlank(aggregateArrayPath)) { - ret = aggregateArrayPath.split(","); - } - - return ret; - } - - public String[] getFlattenArrayPath2() { - String[] ret = null; - - if (StringUtils.isNotBlank(flattenArrayPath)) { - ret = flattenArrayPath.split(","); - } - - return ret; - } - + private List<Integer> kafkas; + @Override public String toString() { - return name; + return String.format("TopicConfig %s(enabled=%s, enabledSinkdbs=%s)", name, enabled, enabledSinkdbs); } @Override @@ -139,12 +66,12 @@ public class TopicConfig { if (this.getClass() != obj.getClass()) return false; - return name.equals(((TopicConfig) obj).getName()); + return id==((TopicConfig) obj).getId(); } @Override public int hashCode() { - return name.hashCode(); + return id; } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java new file mode 100644 index 00000000..39d02d36 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java @@ -0,0 +1,54 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2018 TechMahindra +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.enumeration; + +import org.onap.datalake.feeder.service.db.CouchbaseService; +import org.onap.datalake.feeder.service.db.DbStoreService; +import org.onap.datalake.feeder.service.db.ElasticsearchService; +import org.onap.datalake.feeder.service.db.HdfsService; +import org.onap.datalake.feeder.service.db.MongodbService; + +/** + * Database type + * + * @author Guobiao Mo + * + */ +public enum DbTypeEnum { + CB("Couchbase", CouchbaseService.class) + , DRUID("Druid", null) + , ES("Elasticsearch", ElasticsearchService.class) + , HDFS("HDFS", HdfsService.class) + , MONGO("MongoDB", MongodbService.class) + , KIBANA("Kibana", null) + , SUPERSET("Superset", null); + + private final String name; + private final Class<? extends DbStoreService> serviceClass; + + DbTypeEnum(String name, Class<? extends DbStoreService> serviceClass) { + this.name = name; + this.serviceClass = serviceClass; + } + + public Class<? extends DbStoreService> getServiceClass(){ + return serviceClass; + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java new file mode 100644 index 00000000..157fbf94 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java @@ -0,0 +1,38 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2018 TechMahindra +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.enumeration; + +/** + * Design type + * + * @author Guobiao Mo + * + */ +public enum DesignTypeEnum { + KIBANA_DB("Kibana Dashboard"), KIBANA_SEARCH("Kibana Search"), KIBANA_VISUAL("Kibana Visualization"), + ES_MAPPING("Elasticsearch Field Mapping Template"), DRUID_KAFKA_SPEC("Druid Kafka Indexing Service Supervisor Spec"); + + private final String name; + + DesignTypeEnum(String name) { + this.name = name; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java index b09dcdca..a744da6f 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java @@ -31,7 +31,7 @@ import org.springframework.data.repository.CrudRepository; *
*/
-public interface DbRepository extends CrudRepository<Db, String> {
+public interface DbRepository extends CrudRepository<Db, Integer> {
Db findByName(String Name);
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbTypeRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbTypeRepository.java new file mode 100644 index 00000000..b93cb1d1 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbTypeRepository.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.repository; + +import org.onap.datalake.feeder.domain.DbType; +import org.springframework.data.repository.CrudRepository; + +/** + * DbTypeEnum Repository + * + * @author Guobiao Mo + */ + +public interface DbTypeRepository extends CrudRepository<DbType, String> { + + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignRepository.java new file mode 100644 index 00000000..f144e905 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignRepository.java @@ -0,0 +1,36 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.repository; + +import org.onap.datalake.feeder.domain.Design; +import org.springframework.data.repository.CrudRepository; + +/** + * Design Repository + * + * @author guochunmeng + */ + +public interface DesignRepository extends CrudRepository<Design, Integer> { + + Design findByName(String name); + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignTypeRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignTypeRepository.java new file mode 100755 index 00000000..e7ab48a2 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DesignTypeRepository.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.repository; + +import org.onap.datalake.feeder.domain.DesignType; +import org.springframework.data.repository.CrudRepository; + +/** + * DesignType Repository + * + * @author guochunmeng + */ + +public interface DesignTypeRepository extends CrudRepository<DesignType, String> { + + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java new file mode 100644 index 00000000..6ce23ba7 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java @@ -0,0 +1,35 @@ +/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.Kafka;
+import org.springframework.data.repository.CrudRepository;
+
+/**
+ *
+ * Kafka Repository
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+public interface KafkaRepository extends CrudRepository<Kafka, Integer> {
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java new file mode 100644 index 00000000..9f8ea8a9 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java @@ -0,0 +1,35 @@ +/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.TopicName;
+import org.springframework.data.repository.CrudRepository;
+
+/**
+ *
+ * TopicName Repository
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+public interface TopicNameRepository extends CrudRepository<TopicName, String> {
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java index 2d9adef8..8f72dfed 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java @@ -31,6 +31,6 @@ import org.springframework.data.repository.CrudRepository; *
*/
-public interface TopicRepository extends CrudRepository<Topic, String> {
-
+public interface TopicRepository extends CrudRepository<Topic, Integer> {
+ //List<Topic> findByTopicName(String topicStr);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java index 58bb433a..d54bf3f4 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java @@ -20,11 +20,17 @@ package org.onap.datalake.feeder.service; -import java.util.Optional; +import java.util.HashMap; +import java.util.Map; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.repository.DbRepository; +import org.onap.datalake.feeder.domain.DbType; +import org.onap.datalake.feeder.enumeration.DbTypeEnum; +import org.onap.datalake.feeder.service.db.DbStoreService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; /** @@ -35,33 +41,32 @@ import org.springframework.stereotype.Service; */ @Service public class DbService { + private final Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired - private DbRepository dbRepository; + private ApplicationContext context; - public Db getDb(String name) { - Optional<Db> ret = dbRepository.findById(name); - return ret.isPresent() ? ret.get() : null; - } - - public Db getCouchbase() { - return getDb("Couchbase"); - } - - public Db getElasticsearch() { - return getDb("Elasticsearch"); - } + private Map<Integer, DbStoreService> dbStoreServiceMap = new HashMap<>(); - public Db getMongoDB() { - return getDb("MongoDB"); - } + public DbStoreService findDbStoreService(Db db) { + int dbId = db.getId(); + if (dbStoreServiceMap.containsKey(dbId)) { + return dbStoreServiceMap.get(dbId); + } - public Db getDruid() { - return getDb("Druid"); - } + DbType dbType = db.getDbType(); + DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId()); + Class<? extends DbStoreService> serviceClass = dbTypeEnum.getServiceClass(); + + if (serviceClass == null) { + log.error("Should not have come here {}", db); + dbStoreServiceMap.put(dbId, null); + return null; + } + + DbStoreService ret = context.getBean(serviceClass, db); + dbStoreServiceMap.put(dbId, ret); - public Db getHdfs() { - return getDb("HDFS"); + return ret; } - } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignService.java new file mode 100755 index 00000000..d4924972 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignService.java @@ -0,0 +1,272 @@ +/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.*;
+import org.onap.datalake.feeder.domain.Design;
+import org.onap.datalake.feeder.dto.DesignConfig;
+import org.onap.datalake.feeder.enumeration.DesignTypeEnum;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.repository.DesignTypeRepository;
+import org.onap.datalake.feeder.repository.DesignRepository;
+import org.onap.datalake.feeder.repository.TopicNameRepository;
+import org.onap.datalake.feeder.util.HttpClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service for portalDesigns
+ *
+ * @author guochunmeng
+ */
+
+@Service
+public class DesignService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private static String POST_FLAG;
+
+ private static String URL_FlAG;
+
+ @Autowired
+ private DesignRepository designRepository;
+
+ @Autowired
+ private TopicNameRepository topicNameRepository;
+
+ @Autowired
+ private DesignTypeRepository designTypeRepository;
+
+ @Autowired
+ private ApplicationConfiguration applicationConfiguration;
+
+ @Autowired
+ private DbRepository dbRepository;
+
+ public Design fillDesignConfiguration(DesignConfig designConfig) {
+ Design design = new Design();
+ fillDesign(designConfig, design);
+ return design;
+ }
+
+ public void fillDesignConfiguration(DesignConfig designConfig, Design design) {
+ fillDesign(designConfig, design);
+ }
+
+ private void fillDesign(DesignConfig designConfig, Design design) throws IllegalArgumentException {
+
+ design.setId(designConfig.getId());
+ design.setBody(designConfig.getBody());
+ design.setName(designConfig.getName());
+ design.setNote(designConfig.getNote());
+ design.setSubmitted(designConfig.getSubmitted());
+
+ if (designConfig.getTopicName() == null)
+ throw new IllegalArgumentException("Can not find topicName in tpoic_name, topic name: " + designConfig.getTopicName());
+ Optional<TopicName> topicName = topicNameRepository.findById(designConfig.getTopicName());
+ if (!topicName.isPresent())
+ throw new IllegalArgumentException("topicName is null " + designConfig.getTopicName());
+ design.setTopicName(topicName.get());
+
+ if (designConfig.getDesignType() == null)
+ throw new IllegalArgumentException("Can not find designType in design_type, designType id " + designConfig.getDesignType());
+ Optional<DesignType> designType = designTypeRepository.findById(designConfig.getDesignType());
+ if (!designType.isPresent())
+ throw new IllegalArgumentException("designType is null");
+ design.setDesignType(designType.get());
+
+ Set<Db> dbs = new HashSet<>();
+ if (designConfig.getDbs() != null) {
+ for (Integer item : designConfig.getDbs()) {
+ Optional<Db> db = dbRepository.findById(item);
+ if (db.isPresent()) {
+ dbs.add(db.get());
+ }
+ }
+ if (!dbs.isEmpty())
+ design.setDbs(dbs);
+ else {
+ design.getDbs().clear();
+ design.setDbs(dbs);
+ }
+ } else {
+ design.setDbs(dbs);
+ }
+ }
+
+ public Design getDesign(Integer id) {
+
+ Optional<Design> ret = designRepository.findById(id);
+ return ret.isPresent() ? ret.get() : null;
+ }
+
+ public List<DesignConfig> queryAllDesign() {
+
+ List<Design> designList = null;
+ List<DesignConfig> designConfigList = new ArrayList<>();
+ designList = (List<Design>) designRepository.findAll();
+ if (!designList.isEmpty()) {
+ log.info("DesignList is not null");
+ for (Design design : designList) {
+ designConfigList.add(design.getDesignConfig());
+ }
+ }
+ return designConfigList;
+ }
+
+ public Map<Integer, Boolean> deploy(Design design) {
+ Map<Integer, Boolean> resultMap = null;
+ DesignType designType = design.getDesignType();
+ DesignTypeEnum designTypeEnum = DesignTypeEnum.valueOf(designType.getId());
+
+ switch (designTypeEnum) {
+ case KIBANA_DB:
+ log.info("Deploy kibana dashboard");
+ resultMap = deployKibanaDashboardImport(design);
+ deploySave(resultMap, design);
+ break;
+ case ES_MAPPING:
+ log.info("Deploy elasticsearch mapping template");
+ resultMap = postEsMappingTemplate(design, design.getTopicName().getId().toLowerCase());
+ deploySave(resultMap, design);
+ break;
+ default:
+ log.error("Not implemented {}", designTypeEnum);
+ break;
+ }
+ log.info("Response resultMap: " + resultMap);
+ return resultMap;
+ }
+
+ private Map<Integer, Boolean> deployKibanaDashboardImport(Design design) {
+ URL_FlAG = "Kibana";
+ POST_FLAG = "KibanaDashboardImport";
+ String requestBody = design.getBody();
+ Set<Db> dbs = design.getDbs();
+ Map<Integer, Boolean> deployKibanaMap = new HashMap<>();
+
+ if (!dbs.isEmpty()) {
+ Map<Integer, String> map = urlMap(dbs, URL_FlAG);
+ log.info("Deploy kibana dashboard url map: " + map);
+ if (!map.isEmpty()) {
+ Iterator<Map.Entry<Integer, String>> it = map.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Integer, String> entry = it.next();
+ deployKibanaMap.put(entry.getKey(), HttpClientUtil.sendHttpClientPost(entry.getValue(), requestBody, POST_FLAG, URL_FlAG));
+ }
+ }
+ return deployKibanaMap;
+ } else {
+ return deployKibanaMap;
+ }
+ }
+
+ /**
+ * successed resp: { "acknowledged": true }
+ *
+ * @param design
+ * @param templateName
+ * @return flag
+ */
+ public Map<Integer, Boolean> postEsMappingTemplate(Design design, String templateName) {
+ URL_FlAG = "Elasticsearch";
+ POST_FLAG = "ElasticsearchMappingTemplate";
+ String requestBody = design.getBody();
+ Set<Db> dbs = design.getDbs();
+ Map<Integer, Boolean> deployEsMap = new HashMap<>();
+
+ if (!dbs.isEmpty()) {
+ Map<Integer, String> map = urlMap(dbs, URL_FlAG);
+ log.info("Deploy elasticsearch url map: " + map);
+ if (!map.isEmpty()) {
+ Iterator<Map.Entry<Integer, String>> it = map.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Integer, String> entry = it.next();
+ deployEsMap.put(entry.getKey(), HttpClientUtil.sendHttpClientPost(entry.getValue()+templateName, requestBody, POST_FLAG, URL_FlAG));
+ }
+ }
+ return deployEsMap;
+ } else {
+ return deployEsMap;
+ }
+ }
+
+ private Map<Integer, String> urlMap (Set<Db> dbs, String flag) {
+ Map<Integer, String> map = new HashMap<>();
+ for (Db item : dbs) {
+ if (item.isEnabled()) {
+ map.put(item.getId(), httpRequestUrl(item.getHost(), item.getPort(), flag));
+ }
+ }
+ return map;
+ }
+
+ private String httpRequestUrl(String host, Integer port, String urlFlag) {
+ String url = "";
+ switch (urlFlag) {
+ case "Kibana":
+ if (port == null) {
+ port = applicationConfiguration.getKibanaPort();
+ }
+ url = "http://" + host + ":" + port + applicationConfiguration.getKibanaDashboardImportApi();
+ log.info("Kibana url: " + url);
+ break;
+ case "Elasticsearch":
+ if (port == null) {
+ port = applicationConfiguration.getEsPort();
+ }
+ url = "http://" + host + ":" + port + applicationConfiguration.getEsTemplateMappingApi();
+ log.info("Elasticsearch url: " + url);
+ break;
+ default:
+ break;
+ }
+ return url;
+ }
+
+ private void deploySave(Map<Integer, Boolean> map, Design design) {
+ if (!map.isEmpty()) {
+ Iterator<Map.Entry<Integer, Boolean>> it = map.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Integer, Boolean> entry = it.next();
+ if (entry.getValue()) {
+ design.setSubmitted(true);
+ designRepository.save(design);
+ log.info("Status was modified");
+ }
+ }
+ }
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignTypeService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignTypeService.java new file mode 100755 index 00000000..58bc35e4 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DesignTypeService.java @@ -0,0 +1,62 @@ +/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onap.datalake.feeder.domain.DesignType;
+import org.onap.datalake.feeder.dto.DesignTypeConfig;
+import org.onap.datalake.feeder.repository.DesignTypeRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service for designTypes
+ *
+ * @author guochunmeng
+ */
+@Service
+public class DesignTypeService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ DesignTypeRepository designTypeRepository;
+
+ public List<DesignTypeConfig> getDesignTypes(){
+
+ List<DesignType> designTypeList = null;
+ List<DesignTypeConfig> designTypeConfigList = new ArrayList<>();
+ designTypeList = (List<DesignType>)designTypeRepository.findAll();
+ if (designTypeList != null && !designTypeList.isEmpty()) {
+ log.info("DesignTypeList is not null");
+ for(DesignType designType : designTypeList) {
+ designTypeConfigList.add(designType.getDesignTypeConfig());
+ }
+ }
+
+ return designTypeConfigList;
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java index 3be5be6e..671234ba 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java @@ -21,24 +21,28 @@ package org.onap.datalake.feeder.service; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import org.apache.commons.collections.CollectionUtils; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.onap.datalake.feeder.config.ApplicationConfiguration; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Service; /** @@ -48,6 +52,7 @@ import org.springframework.stereotype.Service; * */ @Service +@Scope("prototype") public class DmaapService { private final Logger log = LoggerFactory.getLogger(this.getClass()); @@ -60,16 +65,29 @@ public class DmaapService { private ZooKeeper zk; + private Kafka kafka; + + public DmaapService(Kafka kafka) { + this.kafka = kafka; + } + @PreDestroy public void cleanUp() throws InterruptedException { - if (zk != null) { - zk.close(); + config.getShutdownLock().readLock().lock(); + + try { + if (zk != null) { + log.info("cleanUp() called, close zk."); + zk.close(); + } + } finally { + config.getShutdownLock().readLock().unlock(); } } @PostConstruct private void init() throws IOException, InterruptedException { - zk = connect(config.getDmaapZookeeperHostPort()); + zk = connect(kafka.getZooKeeper()); } //get all topic names from Zookeeper @@ -77,11 +95,11 @@ public class DmaapService { public List<String> getTopics() { try { if (zk == null) { - zk = connect(config.getDmaapZookeeperHostPort()); + zk = connect(kafka.getZooKeeper()); } - log.info("connecting to ZooKeeper {} for a list of topics.", config.getDmaapZookeeperHostPort()); + log.info("connecting to ZooKeeper {} for a list of topics.", kafka.getZooKeeper()); List<String> topics = zk.getChildren("/brokers/topics", false); - String[] excludes = config.getDmaapKafkaExclude(); + String[] excludes = kafka.getExcludedTopic().split(","); topics.removeAll(Arrays.asList(excludes)); log.info("list of topics: {}", topics); return topics; @@ -93,7 +111,7 @@ public class DmaapService { } private ZooKeeper connect(String host) throws IOException, InterruptedException { - log.info("connecting to ZooKeeper {} ...", config.getDmaapZookeeperHostPort()); + log.info("connecting to ZooKeeper {} ...", kafka.getZooKeeper()); CountDownLatch connectedSignal = new CountDownLatch(1); ZooKeeper ret = new ZooKeeper(host, 10000, new Watcher() { public void process(WatchedEvent we) { @@ -119,18 +137,20 @@ public class DmaapService { return ret; } */ - public List<TopicConfig> getActiveTopicConfigs() throws IOException { + public Map<String, List<EffectiveTopic>> getActiveEffectiveTopic() throws IOException { log.debug("entering getActiveTopicConfigs()..."); - List<String> allTopics = getTopics(); + List<String> allTopics = getTopics(); //topics in Kafka cluster TODO update table topic_name with new topics - List<TopicConfig> ret = new ArrayList<>(allTopics.size()); + Map<String, List<EffectiveTopic>> ret = new HashMap<>(); for (String topicStr : allTopics) { log.debug("get topic setting from DB: {}.", topicStr); - TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true); - if (topicConfig.isEnabled()) { - ret.add(topicConfig); + List<EffectiveTopic> effectiveTopics= topicService.getEnabledEffectiveTopic(kafka, topicStr, true); + if(CollectionUtils.isNotEmpty(effectiveTopics )) { + log.debug("add effectiveTopics {}:{}.", topicStr, effectiveTopics); + ret.put(topicStr , effectiveTopics); } + } return ret; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java new file mode 100644 index 00000000..2e959fa2 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java @@ -0,0 +1,87 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.service; + +import org.onap.datalake.feeder.domain.Kafka; +import org.onap.datalake.feeder.dto.KafkaConfig; +import org.onap.datalake.feeder.repository.KafkaRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.*; + +/** + * Service for kafkas + * + * @author guochunmeng + */ +@Service +public class KafkaService { + + @Autowired + private KafkaRepository kafkaRepository; + + public Kafka getKafkaById(int id) { + + Optional<Kafka> ret = kafkaRepository.findById(id); + return ret.isPresent() ? ret.get() : null; + } + + public List<KafkaConfig> getAllKafka() { + + List<KafkaConfig> kafkaConfigList = new ArrayList<>(); + Iterable<Kafka> kafkaIterable = kafkaRepository.findAll(); + for(Kafka portal : kafkaIterable) { + kafkaConfigList.add(portal.getKafkaConfig()); + } + return kafkaConfigList; + } + + public Kafka fillKafkaConfiguration(KafkaConfig kafkaConfig) { + Kafka kafka = new Kafka(); + fillKafka(kafkaConfig, kafka); + return kafka; + } + + public void fillKafkaConfiguration(KafkaConfig kafkaConfig, Kafka kafka) { + fillKafka(kafkaConfig, kafka); + } + + private void fillKafka(KafkaConfig kafkaConfig, Kafka kafka) { + + kafka.setId(kafkaConfig.getId()); + kafka.setBrokerList(kafkaConfig.getBrokerList()); + kafka.setConsumerCount(kafkaConfig.getConsumerCount()); + kafka.setEnabled(kafkaConfig.isEnabled()); + kafka.setExcludedTopic(kafkaConfig.getExcludedTopic()); + kafka.setIncludedTopic(kafkaConfig.getIncludedTopic()); + kafka.setGroup(kafkaConfig.getGroup()); + kafka.setLogin(kafkaConfig.getLogin()); + kafka.setName(kafkaConfig.getName()); + kafka.setPass(kafkaConfig.getPass()); + kafka.setSecure(kafkaConfig.isSecure()); + kafka.setSecurityProtocol(kafkaConfig.getSecurityProtocol()); + kafka.setTimeout(kafkaConfig.getTimeout()); + kafka.setZooKeeper(kafkaConfig.getZooKeeper()); + + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java index 7ed88797..09a59ee3 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java @@ -21,14 +21,19 @@ package org.onap.datalake.feeder.service; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Kafka; +import org.onap.datalake.feeder.repository.KafkaRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; /** @@ -45,17 +50,20 @@ public class PullService { private boolean isRunning = false; private ExecutorService executorService; - private Thread topicConfigPollingThread; + private Set<Puller> pullers; @Autowired - private Puller puller; + private KafkaRepository kafkaRepository; @Autowired private TopicConfigPollingService topicConfigPollingService; - + @Autowired private ApplicationConfiguration config; + @Autowired + private ApplicationContext context; + /** * @return the isRunning */ @@ -73,23 +81,33 @@ public class PullService { return; } - logger.info("start pulling ..."); - int numConsumers = config.getKafkaConsumerCount(); - executorService = Executors.newFixedThreadPool(numConsumers); + logger.info("PullService starting ..."); - for (int i = 0; i < numConsumers; i++) { - executorService.submit(puller); + pullers = new HashSet<>(); + executorService = Executors.newCachedThreadPool(); + + Iterable<Kafka> kafkas = kafkaRepository.findAll(); + for (Kafka kafka : kafkas) { + if (kafka.isEnabled()) { + doKafka(kafka); + } } - - topicConfigPollingThread = new Thread(topicConfigPollingService); - topicConfigPollingThread.setName("TopicConfigPolling"); - topicConfigPollingThread.start(); + executorService.submit(topicConfigPollingService); + isRunning = true; Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); } + private void doKafka(Kafka kafka) { + Puller puller = context.getBean(Puller.class, kafka); + pullers.add(puller); + for (int i = 0; i < kafka.getConsumerCount(); i++) { + executorService.submit(puller); + } + } + /** * stop pulling */ @@ -98,20 +116,23 @@ public class PullService { return; } - logger.info("stop pulling ..."); - puller.shutdown(); - - logger.info("stop TopicConfigPollingService ..."); - topicConfigPollingService.shutdown(); - + config.getShutdownLock().writeLock().lock(); try { - topicConfigPollingThread.join(); - + logger.info("stop pulling ..."); + for (Puller puller : pullers) { + puller.shutdown(); + } + + logger.info("stop executorService ..."); executorService.shutdown(); executorService.awaitTermination(120L, TimeUnit.SECONDS); } catch (InterruptedException e) { - logger.error("executor.awaitTermination", e); + logger.error("shutdown(): executor.awaitTermination", e); Thread.currentThread().interrupt(); + } catch (Exception e) { + logger.error("shutdown error.", e); + } finally { + config.getShutdownLock().writeLock().unlock(); } isRunning = false; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java index 9e4ab455..ab99ad09 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java @@ -26,11 +26,11 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.PostConstruct; import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -39,10 +39,10 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Service; @@ -54,7 +54,7 @@ import org.springframework.stereotype.Service; */ @Service -//@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Scope("prototype") public class Puller implements Runnable { @Autowired @@ -68,10 +68,17 @@ public class Puller implements Runnable { private final Logger log = LoggerFactory.getLogger(this.getClass()); + //KafkaConsumer is not thread-safe. private ThreadLocal<KafkaConsumer<String, String>> consumerLocal = new ThreadLocal<>(); //<String, String> is key-value type, in our case key is empty, value is JSON text private boolean active = false; private boolean async; + + private Kafka kafka; + + public Puller(Kafka kafka) { + this.kafka = kafka; + } @PostConstruct private void init() { @@ -81,8 +88,8 @@ public class Puller implements Runnable { private Properties getConsumerConfig() { Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup()); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokerList()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, kafka.getGroup()); consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(Thread.currentThread().getId())); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); @@ -90,9 +97,12 @@ public class Puller implements Runnable { consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - // consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); - // consumerConfig.put("sasl.mechanism", "PLAIN"); - + if (kafka.isSecure()) { + String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + kafka.getLogin() + " password=" + kafka.getPass() + " serviceName=kafka;"; + consumerConfig.put("sasl.jaas.config", jaas); + consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafka.getSecurityProtocol()); + consumerConfig.put("sasl.mechanism", "PLAIN"); + } return consumerConfig; } @@ -103,7 +113,7 @@ public class Puller implements Runnable { public void run() { active = true; Properties consumerConfig = getConsumerConfig(); - log.info("Kafka ConsumerConfig: {}", consumerConfig); + log.info("Kafka: {}, ConsumerConfig: {}", kafka, consumerConfig); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig); consumerLocal.set(consumer); @@ -111,8 +121,8 @@ public class Puller implements Runnable { try { while (active) { - if (topicConfigPollingService.isActiveTopicsChanged(true)) {//true means update local version as well - List<String> topics = topicConfigPollingService.getActiveTopics(); + if (topicConfigPollingService.isActiveTopicsChanged(kafka)) { + Collection<String> topics = topicConfigPollingService.getActiveTopics(kafka); log.info("Active Topic list is changed, subscribe to the latest topics: {}", topics); consumer.subscribe(topics, rebalanceListener); } @@ -132,7 +142,7 @@ public class Puller implements Runnable { KafkaConsumer<String, String> consumer = consumerLocal.get(); log.debug("pulling..."); - ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout())); + ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(kafka.getTimeout())); log.debug("done pulling."); if (records != null && records.count() > 0) { @@ -144,10 +154,10 @@ public class Puller implements Runnable { messages.add(Pair.of(record.timestamp(), record.value())); //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); } - storeService.saveMessages(partition.topic(), messages); + storeService.saveMessages(kafka, partition.topic(), messages); log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB - if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit + if (!async) {//for reliability, sync commit offset to Kafka right after saving the data to data store, this slows down a bit long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 2a2f997e..0e54b9b5 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -22,7 +22,9 @@ package org.onap.datalake.feeder.service; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Set; import javax.annotation.PostConstruct; @@ -32,8 +34,11 @@ import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; import org.json.XML; import org.onap.datalake.feeder.config.ApplicationConfiguration; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.enumeration.DataFormat; +import org.onap.datalake.feeder.service.db.DbStoreService; import org.onap.datalake.feeder.util.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,19 +65,10 @@ public class StoreService { private ApplicationConfiguration config; @Autowired - private TopicConfigPollingService configPollingService; - - @Autowired - private MongodbService mongodbService; - - @Autowired - private CouchbaseService couchbaseService; - - @Autowired - private ElasticsearchService elasticsearchService; + private DbService dbService; @Autowired - private HdfsService hdfsService; + private TopicConfigPollingService configPollingService; private ObjectMapper yamlReader; @@ -81,43 +77,57 @@ public class StoreService { yamlReader = new ObjectMapper(new YAMLFactory()); } - public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text + public void saveMessages(Kafka kafka, String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text if (CollectionUtils.isEmpty(messages)) { return; } - TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr); + Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr); + for (EffectiveTopic effectiveTopic : effectiveTopics) { + saveMessagesForTopic(effectiveTopic, messages); + } + } + + private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) { + if (!effectiveTopic.getTopic().isEnabled()) { + log.error("we should not come here {}", effectiveTopic); + return; + } List<JSONObject> docs = new ArrayList<>(); for (Pair<Long, String> pair : messages) { try { - docs.add(messageToJson(topicConfig, pair)); + docs.add(messageToJson(effectiveTopic, pair)); } catch (Exception e) { //may see org.json.JSONException. log.error("Error when converting this message to JSON: " + pair.getRight(), e); } } - saveJsons(topicConfig, docs, messages); + Set<Db> dbs = effectiveTopic.getTopic().getDbs(); + + for (Db db : dbs) { + if (db.isTool() || db.isDruid() || !db.isEnabled()) { + continue; + } + DbStoreService dbStoreService = dbService.findDbStoreService(db); + if (dbStoreService != null) { + dbStoreService.saveJsons(effectiveTopic, docs); + } + } } - private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException { + private JSONObject messageToJson(EffectiveTopic effectiveTopic, Pair<Long, String> pair) throws IOException { long timestamp = pair.getLeft(); String text = pair.getRight(); - //for debug, to be remove - // String topicStr = topic.getId(); - // if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) { - // log.debug("{} ={}", topicStr, text); - //} - - boolean storeRaw = topicConfig.isSaveRaw(); + boolean storeRaw = effectiveTopic.getTopic().isSaveRaw(); JSONObject json = null; - DataFormat dataFormat = topicConfig.getDataFormat2(); + DataFormat dataFormat = effectiveTopic.getTopic().getDataFormat2(); switch (dataFormat) { case JSON: @@ -148,15 +158,15 @@ public class StoreService { json.put(config.getRawDataLabel(), text); } - if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) { - String[] paths = topicConfig.getAggregateArrayPath2(); + if (StringUtils.isNotBlank(effectiveTopic.getTopic().getAggregateArrayPath())) { + String[] paths = effectiveTopic.getTopic().getAggregateArrayPath2(); for (String path : paths) { JsonUtil.arrayAggregate(path, json); } } - if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) { - String[] paths = topicConfig.getFlattenArrayPath2(); + if (StringUtils.isNotBlank(effectiveTopic.getTopic().getFlattenArrayPath())) { + String[] paths = effectiveTopic.getTopic().getFlattenArrayPath2(); for (String path : paths) { JsonUtil.flattenArray(path, json); } @@ -165,29 +175,11 @@ public class StoreService { return json; } - private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) { - if (topic.supportMongoDB()) { - mongodbService.saveJsons(topic, jsons); - } - - if (topic.supportCouchbase()) { - couchbaseService.saveJsons(topic, jsons); - } - - if (topic.supportElasticsearch()) { - elasticsearchService.saveJsons(topic, jsons); - } - - if (topic.supportHdfs()) { - hdfsService.saveMessages(topic, messages); - } - } - public void flush() { //force flush all buffer - hdfsService.flush(); + // hdfsService.flush(); } public void flushStall() { //flush stall buffer - hdfsService.flushStall(); + // hdfsService.flushStall(); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java index 58b27834..a02cd6a2 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java @@ -21,23 +21,27 @@ package org.onap.datalake.feeder.service; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.PostConstruct; import org.apache.commons.collections.CollectionUtils; import org.onap.datalake.feeder.config.ApplicationConfiguration; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Kafka; +import org.onap.datalake.feeder.repository.KafkaRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; /** - * Service to check topic changes in Kafka and topic setting updates + * Service to check topic changes in Kafka and topic setting updates in DB * * @author Guobiao Mo * @@ -51,70 +55,93 @@ public class TopicConfigPollingService implements Runnable { ApplicationConfiguration config; @Autowired - private DmaapService dmaapService; + private ApplicationContext context; - //effective TopicConfig Map - private Map<String, TopicConfig> effectiveTopicConfigMap = new HashMap<>(); - - //monitor Kafka topic list changes - private List<String> activeTopics; - private ThreadLocal<Integer> activeTopicsVersionLocal = ThreadLocal.withInitial(() -> -1); - private int currentActiveTopicsVersion = -1; + @Autowired + private KafkaRepository kafkaRepository; + + //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic. + private Map<Integer, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>(); + //private Map<String, TopicConfig> effectiveTopicConfigMap; + + //monitor Kafka topic list changes, key is kafka id, value is active Topics + private Map<Integer, Set<String>> activeTopicMap; + + private ThreadLocal<Map<Integer, Integer>> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//kafkaId:version - local 'old' version + private Map<Integer, Integer> currentActiveTopicsVersionMap = new HashMap<>();//kafkaId:version - current/latest version + private Map<Integer, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService private boolean active = false; @PostConstruct private void init() { try { - log.info("init(), ccalling poll()..."); - activeTopics = poll(); - currentActiveTopicsVersion++; + log.info("init(), calling poll()..."); + activeTopicMap = poll(); } catch (Exception ex) { log.error("error connection to HDFS.", ex); } } - public boolean isActiveTopicsChanged(boolean update) { - boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get(); - log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get()); - if (changed && update) { - activeTopicsVersionLocal.set(currentActiveTopicsVersion); + public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version + int kafkaId = kafka.getId(); + int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version + int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0); + + boolean changed = currentActiveTopicsVersion > localActiveTopicsVersion; + log.debug("kafkaId={} isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", kafkaId, changed, currentActiveTopicsVersion, localActiveTopicsVersion); + if (changed) { + activeTopicsVersionLocal.get().put(kafkaId, currentActiveTopicsVersion); } return changed; } - public List<String> getActiveTopics() { - return activeTopics; + //get a list of topic names to monitor + public Collection<String> getActiveTopics(Kafka kafka) { + return activeTopicMap.get(kafka.getId()); } - public TopicConfig getEffectiveTopicConfig(String topicStr) { - return effectiveTopicConfigMap.get(topicStr); + //get the EffectiveTopics given kafka and topic name + public Collection<EffectiveTopic> getEffectiveTopic(Kafka kafka, String topicStr) { + Map<String, List<EffectiveTopic>> effectiveTopicMapKafka= effectiveTopicMap.get(kafka.getId()); + return effectiveTopicMapKafka.get(topicStr); } @Override public void run() { active = true; log.info("TopicConfigPollingService started."); - + while (active) { try { //sleep first since we already pool in init() - Thread.sleep(config.getDmaapCheckNewTopicInterval()); + Thread.sleep(config.getCheckTopicInterval()); + if(!active) { + break; + } } catch (InterruptedException e) { log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e); Thread.currentThread().interrupt(); } try { - List<String> newTopics = poll(); - if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) { - log.info("activeTopics list is updated, old={}", activeTopics); - log.info("activeTopics list is updated, new={}", newTopics); - - activeTopics = newTopics; - currentActiveTopicsVersion++; - } else { - log.debug("activeTopics list is not updated."); + Map<Integer, Set<String>> newTopicsMap = poll(); + + for(Map.Entry<Integer, Set<String>> entry:newTopicsMap.entrySet()) { + Integer kafkaId = entry.getKey(); + Set<String> newTopics = entry.getValue(); + + Set<String> activeTopics = activeTopicMap.get(kafkaId); + + if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) { + log.info("activeTopics list is updated, old={}", activeTopics); + log.info("activeTopics list is updated, new={}", newTopics); + + activeTopicMap.put(kafkaId, newTopics); + currentActiveTopicsVersionMap.put(kafkaId, currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1)+1); + } else { + log.debug("activeTopics list is not updated."); + } } } catch (IOException e) { log.error("dmaapService.getActiveTopics()", e); @@ -128,13 +155,31 @@ public class TopicConfigPollingService implements Runnable { active = false; } - private List<String> poll() throws IOException { + private Map<Integer, Set<String>> poll() throws IOException { + Map<Integer, Set<String>> ret = new HashMap<>(); + Iterable<Kafka> kafkas = kafkaRepository.findAll(); + for (Kafka kafka : kafkas) { + if (kafka.isEnabled()) { + Set<String> topics = poll(kafka); + ret.put(kafka.getId(), topics); + } + } + return ret; + } + + private Set<String> poll(Kafka kafka) throws IOException { log.debug("poll(), use dmaapService to getActiveTopicConfigs..."); - List<TopicConfig> activeTopicConfigs = dmaapService.getActiveTopicConfigs(); - activeTopicConfigs.stream().forEach(topicConfig -> effectiveTopicConfigMap.put(topicConfig.getName(), topicConfig)); - List<String> ret = new ArrayList<>(activeTopicConfigs.size()); - activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName())); + DmaapService dmaapService = dmaapServiceMap.get(kafka.getId()); + if(dmaapService==null) { + dmaapService = context.getBean(DmaapService.class, kafka); + dmaapServiceMap.put(kafka.getId(), dmaapService); + } + + Map<String, List<EffectiveTopic>> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic(); + effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics); + + Set<String> ret = activeEffectiveTopics.keySet(); return ret; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index 64e8b8b1..043cc653 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -21,23 +21,31 @@ package org.onap.datalake.feeder.service; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.Set; +import org.apache.commons.collections.CollectionUtils; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.repository.DbRepository; +import org.onap.datalake.feeder.repository.KafkaRepository; +import org.onap.datalake.feeder.repository.TopicNameRepository; import org.onap.datalake.feeder.repository.TopicRepository; +import org.onap.datalake.feeder.service.db.ElasticsearchService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** - * Service for topics + * Service for topics * * @author Guobiao Mo * @@ -49,72 +57,93 @@ public class TopicService { @Autowired private ApplicationConfiguration config; - + + @Autowired + private TopicNameRepository topicNameRepository; + @Autowired private TopicRepository topicRepository; @Autowired - private ElasticsearchService elasticsearchService; + private DbRepository dbRepository; + @Autowired + private DbService dbService; @Autowired - private DbRepository dbRepository; + private KafkaRepository kafkaRepository; + + public List<EffectiveTopic> getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException { - public TopicConfig getEffectiveTopic(String topicStr) { - try { - return getEffectiveTopic(topicStr, false); - } catch (IOException e) { - log.error(topicStr, e); + List<Topic> topics = findTopics(kafka, topicStr); + if (CollectionUtils.isEmpty(topics)) { + topics = new ArrayList<>(); + topics.add(getDefaultTopic(kafka)); } - return null; - } - public TopicConfig getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException { - Topic topic = getTopic(topicStr); - if (topic == null) { - topic = getDefaultTopic(); + List<EffectiveTopic> ret = new ArrayList<>(); + for (Topic topic : topics) { + if (!topic.isEnabled()) { + continue; + } + ret.add(new EffectiveTopic(topic, topicStr)); + + if (ensureTableExist) { + for (Db db : topic.getDbs()) { + if (db.isElasticsearch()) { + ElasticsearchService elasticsearchService = (ElasticsearchService) dbService.findDbStoreService(db); + elasticsearchService.ensureTableExist(topicStr); + } + } + } } - TopicConfig topicConfig = topic.getTopicConfig(); - topicConfig.setName(topicStr);//need to change name if it comes from DefaultTopic + + return ret; + } + + //TODO use query + public List<Topic> findTopics(Kafka kafka, String topicStr) { + List<Topic> ret = new ArrayList<>(); - if(ensureTableExist && topicConfig.isEnabled() && topicConfig.supportElasticsearch()) { - elasticsearchService.ensureTableExist(topicStr); + Iterable<Topic> allTopics = topicRepository.findAll(); + for(Topic topic: allTopics) { + if(topic.getKafkas().contains(kafka ) && topic.getTopicName().getId().equals(topicStr)){ + ret.add(topic); + } } - return topicConfig; + return ret; } - public Topic getTopic(String topicStr) { - Optional<Topic> ret = topicRepository.findById(topicStr); + public Topic getTopic(int topicId) { + Optional<Topic> ret = topicRepository.findById(topicId); return ret.isPresent() ? ret.get() : null; } - public Topic getDefaultTopic() { - return getTopic(config.getDefaultTopicName()); + public Topic getDefaultTopic(Kafka kafka) { + return findTopics(kafka, config.getDefaultTopicName()).get(0); } - public boolean istDefaultTopic(Topic topic) { + public boolean isDefaultTopic(Topic topic) { if (topic == null) { return false; } return topic.getName().equals(config.getDefaultTopicName()); } - public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic) - { + public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic) { fillTopic(tConfig, wTopic); } - public Topic fillTopicConfiguration(TopicConfig tConfig) - { + public Topic fillTopicConfiguration(TopicConfig tConfig) { Topic topic = new Topic(); fillTopic(tConfig, topic); return topic; } - private void fillTopic(TopicConfig tConfig, Topic topic) - { + private void fillTopic(TopicConfig tConfig, Topic topic) { Set<Db> relateDb = new HashSet<>(); - topic.setName(tConfig.getName()); + topic.setId(tConfig.getId()); + topic.setTopicName(topicNameRepository.findById(tConfig.getName()).get()); topic.setLogin(tConfig.getLogin()); topic.setPass(tConfig.getPassword()); topic.setEnabled(tConfig.isEnabled()); @@ -126,24 +155,38 @@ public class TopicService { topic.setAggregateArrayPath(tConfig.getAggregateArrayPath()); topic.setFlattenArrayPath(tConfig.getFlattenArrayPath()); - if(tConfig.getSinkdbs() != null) { + if (tConfig.getSinkdbs() != null) { for (String item : tConfig.getSinkdbs()) { Db sinkdb = dbRepository.findByName(item); if (sinkdb != null) { relateDb.add(sinkdb); } } - if(relateDb.size() > 0) + if (!relateDb.isEmpty()) topic.setDbs(relateDb); - else if(relateDb.size() == 0) - { + else { topic.getDbs().clear(); } - }else - { + } else { topic.setDbs(relateDb); } + Set<Kafka> relateKafka = new HashSet<>(); + if (tConfig.getKafkas() != null) { + for (int item : tConfig.getKafkas()) { + Optional<Kafka> sinkKafka = kafkaRepository.findById(item); + if (sinkKafka.isPresent()) { + relateKafka.add(sinkKafka.get()); + } + } + if (!relateKafka.isEmpty()) { + topic.setKafkas(relateKafka); + } else { + topic.getKafkas().clear(); + } + } else { + topic.setKafkas(relateKafka); + } } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java index d7d5f873..44b940a2 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.datalake.feeder.service; +package org.onap.datalake.feeder.service.db; import java.util.ArrayList; import java.util.List; @@ -30,10 +30,12 @@ import javax.annotation.PreDestroy; import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Service; import com.couchbase.client.java.Bucket; @@ -55,25 +57,27 @@ import rx.functions.Func1; * */ @Service -public class CouchbaseService { +@Scope("prototype") +public class CouchbaseService implements DbStoreService { private final Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired ApplicationConfiguration config; - - @Autowired - private DbService dbService; - + + private Db couchbase; + //Bucket is thread-safe. https://docs.couchbase.com/java-sdk/current/managing-connections.html Bucket bucket; - private boolean isReady = false; - + + public CouchbaseService(Db db) { + couchbase = db; + } + @PostConstruct - private void init() { + @Override + public void init() { // Initialize Couchbase Connection try { - Db couchbase = dbService.getCouchbase(); - //this tunes the SDK (to customize connection timeout) CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().connectTimeout(60000) // 60s, default is 5s .build(); @@ -84,19 +88,27 @@ public class CouchbaseService { bucket.bucketManager().createN1qlPrimaryIndex(true, false); log.info("Connected to Couchbase {} as {}", couchbase.getHost(), couchbase.getLogin()); - isReady = true; +// isReady = true; } catch (Exception ex) { log.error("error connection to Couchbase.", ex); - isReady = false; + // isReady = false; } } @PreDestroy public void cleanUp() { - bucket.close(); + config.getShutdownLock().readLock().lock(); + + try { + log.info("bucket.close() at cleanUp."); + bucket.close(); + } finally { + config.getShutdownLock().readLock().unlock(); + } } - public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { + @Override + public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) { List<JsonDocument> documents = new ArrayList<>(jsons.size()); for (JSONObject json : jsons) { //convert to Couchbase JsonObject from org.json JSONObject @@ -105,9 +117,9 @@ public class CouchbaseService { long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson() //setup TTL - int expiry = (int) (timestamp / 1000L) + topic.getTtl() * 3600 * 24; //in second + int expiry = (int) (timestamp / 1000L) + effectiveTopic.getTopic().getTtl() * 3600 * 24; //in second - String id = getId(topic, json); + String id = getId(effectiveTopic.getTopic(), json); JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); documents.add(doc); } @@ -126,10 +138,10 @@ public class CouchbaseService { } catch (Exception e) { log.error("error saving to Couchbase.", e); } - log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); + log.debug("saved text to topic = {}, this batch count = {} ", effectiveTopic, documents.size()); } - public String getId(TopicConfig topic, JSONObject json) { + public String getId(Topic topic, JSONObject json) { //if this topic requires extract id from JSON String id = topic.getMessageId(json); if (id != null) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java new file mode 100644 index 00000000..c873c010 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java @@ -0,0 +1,39 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2018 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service.db; + +import java.util.List; + +import org.json.JSONObject; +import org.onap.datalake.feeder.domain.EffectiveTopic; + +/** + * Interface for all db store services + * + * @author Guobiao Mo + * + */ +public interface DbStoreService { + + void saveJsons(EffectiveTopic topic, List<JSONObject> jsons); + + void init(); +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java index 2806e48b..e303fa9b 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.datalake.feeder.service; +package org.onap.datalake.feeder.service.db; import java.io.IOException; import java.util.List; @@ -47,11 +47,13 @@ import org.elasticsearch.rest.RestStatus; import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Service; /** @@ -61,24 +63,28 @@ import org.springframework.stereotype.Service; * */ @Service -public class ElasticsearchService { +@Scope("prototype") +public class ElasticsearchService implements DbStoreService { private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private Db elasticsearch; @Autowired private ApplicationConfiguration config; - @Autowired - private DbService dbService; - - private RestHighLevelClient client; + private RestHighLevelClient client;//thread safe ActionListener<BulkResponse> listener; - + + public ElasticsearchService(Db db) { + elasticsearch = db; + } + //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html @PostConstruct - private void init() { - Db elasticsearch = dbService.getElasticsearch(); + @Override + public void init() { String elasticsearchHost = elasticsearch.getHost(); // Initialize the Connection @@ -89,7 +95,9 @@ public class ElasticsearchService { listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { - + if(bulkResponse.hasFailures()) { + log.debug(bulkResponse.buildFailureMessage()); + } } @Override @@ -101,7 +109,16 @@ public class ElasticsearchService { @PreDestroy public void cleanUp() throws IOException { - client.close(); + config.getShutdownLock().readLock().lock(); + + try { + log.info("cleanUp() closing Elasticsearch client."); + client.close(); + } catch (IOException e) { + log.error("client.close() at cleanUp.", e); + } finally { + config.getShutdownLock().readLock().unlock(); + } } public void ensureTableExist(String topic) throws IOException { @@ -119,35 +136,41 @@ public class ElasticsearchService { } //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME - public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { + @Override + public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) { + BulkRequest request = new BulkRequest(); for (JSONObject json : jsons) { - if (topic.isCorrelateClearedMessage()) { - boolean found = correlateClearedMessage(topic, json); + if (effectiveTopic.getTopic().isCorrelateClearedMessage()) { + boolean found = correlateClearedMessage(effectiveTopic.getTopic(), json); if (found) { continue; } - } - - String id = topic.getMessageId(json); //id can be null - - request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); + } + + String id = effectiveTopic.getTopic().getMessageId(json); //id can be null + + request.add(new IndexRequest(effectiveTopic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); } - log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size()); + log.debug("saving text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size()); if (config.isAsync()) { client.bulkAsync(request, RequestOptions.DEFAULT, listener); } else { try { - client.bulk(request, RequestOptions.DEFAULT); + BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); + if(bulkResponse.hasFailures()) { + log.debug(bulkResponse.buildFailureMessage()); + } } catch (IOException e) { - log.error(topic.getName(), e); + log.error(effectiveTopic.getName(), e); } } + } - + /** * * @param topic @@ -159,7 +182,7 @@ public class ElasticsearchService { * source. So use the get API, three parameters: index, type, document * id */ - private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) { + private boolean correlateClearedMessage(Topic topic, JSONObject json) { boolean found = false; String eName = null; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java index 135a2c09..1725ee41 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.datalake.feeder.service; +package org.onap.datalake.feeder.service.db; import java.io.IOException; import java.net.InetAddress; @@ -32,23 +32,23 @@ import java.util.Map; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ShutdownHookManager; +import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.domain.EffectiveTopic; import org.onap.datalake.feeder.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Service; import lombok.Getter; -import lombok.Setter; /** * Service to write data to HDFS @@ -57,24 +57,22 @@ import lombok.Setter; * */ @Service -public class HdfsService { +@Scope("prototype") +public class HdfsService implements DbStoreService { private final Logger log = LoggerFactory.getLogger(this.getClass()); - @Autowired - ApplicationConfiguration config; + private Db hdfs; @Autowired - private DbService dbService; + ApplicationConfiguration config; FileSystem fileSystem; - private boolean isReady = false; private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new); private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd")); private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS")); - @Setter @Getter private class Buffer { long lastFlush; @@ -93,7 +91,7 @@ public class HdfsService { lastFlush = System.currentTimeMillis(); } } catch (IOException e) { - log.error("error saving to HDFS." + topic, e); + log.error("{} error saving to HDFS. {}", topic, e.getMessage()); } } @@ -104,12 +102,21 @@ public class HdfsService { } } - public void addData(List<Pair<Long, String>> messages) { + /* + public void addData(List<Pair<Long, String>> messages) { + if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer + lastFlush = System.currentTimeMillis(); + } + + messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used + } + */ + public void addData2(List<JSONObject> messages) { if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer lastFlush = System.currentTimeMillis(); } - messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used + messages.stream().forEach(message -> data.add(message.toString())); } private void saveMessages(String topic, List<String> bufferList) throws IOException { @@ -134,20 +141,24 @@ public class HdfsService { out.writeUTF(message); out.write('\n'); } catch (IOException e) { - log.error("error writing to HDFS.", e); + log.error("error writing to HDFS. {}", e.getMessage()); } }); out.close(); + log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath); } } + public HdfsService(Db db) { + hdfs = db; + } + @PostConstruct - private void init() { + @Override + public void init() { // Initialize HDFS Connection try { - Db hdfs = dbService.getHdfs(); - //Get configuration of Hadoop system Configuration hdfsConfig = new Configuration(); @@ -161,45 +172,73 @@ public class HdfsService { fileSystem = FileSystem.get(hdfsConfig); - isReady = true; + //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data + ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get(); + hadoopShutdownHookManager.clearShutdownHooks(); + } catch (Exception ex) { log.error("error connection to HDFS.", ex); - isReady = false; } } @PreDestroy public void cleanUp() { + config.getShutdownLock().readLock().lock(); + try { + log.info("fileSystem.close() at cleanUp."); flush(); fileSystem.close(); } catch (IOException e) { log.error("fileSystem.close() at cleanUp.", e); + } finally { + config.getShutdownLock().readLock().unlock(); } } public void flush() { + log.info("Force flush ALL data, regardless of stall"); bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic)); } //if no new data comes in for a topic for a while, need to flush its buffer public void flushStall() { + log.debug("Flush stall data"); bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic)); } - public void saveMessages(TopicConfig topic, List<Pair<Long, String>> messages) { + /* + //used if raw data should be saved + public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) { + String topicStr = topic.getName(); + + Map<String, Buffer> bufferMap = bufferLocal.get(); + final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer()); + + buffer.addData(messages); + + if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) { + buffer.flush(topicStr); + } else { + log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize()); + } + } + */ + @Override + public void saveJsons(EffectiveTopic topic, List<JSONObject> jsons) { String topicStr = topic.getName(); Map<String, Buffer> bufferMap = bufferLocal.get(); final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer()); - buffer.addData(messages); + buffer.addData2(jsons); if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) { buffer.flush(topicStr); } else { - log.debug("buffer size too small to flush: bufferData.size() {} < config.getHdfsBatchSize() {}", buffer.getData().size(), config.getHdfsBatchSize()); + log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize()); } + } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java index 32d21c62..eb8a3a16 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.datalake.feeder.service; +package org.onap.datalake.feeder.service.db; import java.util.ArrayList; import java.util.HashMap; @@ -34,11 +34,12 @@ import org.bson.Document; import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.domain.EffectiveTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Service; import com.mongodb.bulk.BulkWriteError; @@ -47,6 +48,7 @@ import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.MongoClientOptions.Builder; import com.mongodb.MongoCredential; +import com.mongodb.MongoTimeoutException; import com.mongodb.ServerAddress; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; @@ -59,26 +61,30 @@ import com.mongodb.client.model.InsertManyOptions; * */ @Service -public class MongodbService { +@Scope("prototype") +public class MongodbService implements DbStoreService { private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private Db mongodb; @Autowired private ApplicationConfiguration config; private boolean dbReady = false; - @Autowired - private DbService dbService; - private MongoDatabase database; private MongoClient mongoClient; + //MongoCollection is ThreadSafe private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>(); private InsertManyOptions insertManyOptions; + public MongodbService(Db db) { + mongodb = db; + } + @PostConstruct - private void init() { - Db mongodb = dbService.getMongoDB(); - + @Override + public void init() { String host = mongodb.getHost(); Integer port = mongodb.getPort(); @@ -103,14 +109,14 @@ public class MongodbService { builder.sslEnabled(Boolean.TRUE.equals(mongodb.getEncrypt()));// getEncrypt() can be null } MongoClientOptions options = builder.build(); - List<ServerAddress> addrs = new ArrayList<ServerAddress>(); + List<ServerAddress> addrs = new ArrayList<>(); addrs.add(new ServerAddress(host, port)); // FIXME should be a list of address try { if (StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) { credential = MongoCredential.createCredential(userName, databaseName, password.toCharArray()); - List<MongoCredential> credentialList = new ArrayList<MongoCredential>(); + List<MongoCredential> credentialList = new ArrayList<>(); credentialList.add(credential); mongoClient = new MongoClient(addrs, credentialList, options); } else { @@ -131,25 +137,32 @@ public class MongodbService { @PreDestroy public void cleanUp() { - mongoClient.close(); + config.getShutdownLock().readLock().lock(); + + try { + log.info("mongoClient.close() at cleanUp."); + mongoClient.close(); + } finally { + config.getShutdownLock().readLock().unlock(); + } } - public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { - if (dbReady == false)//TOD throw exception + public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) { + if (!dbReady)//TOD throw exception return; List<Document> documents = new ArrayList<>(jsons.size()); for (JSONObject json : jsons) { //convert org.json JSONObject to MongoDB Document Document doc = Document.parse(json.toString()); - String id = topic.getMessageId(json); //id can be null + String id = effectiveTopic.getTopic().getMessageId(json); //id can be null if (id != null) { doc.put("_id", id); } documents.add(doc); } - String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ . + String collectionName = effectiveTopic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ . MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k)); try { @@ -159,9 +172,11 @@ public class MongodbService { for (BulkWriteError bulkWriteError : bulkWriteErrors) { log.error("Failed record: {}", bulkWriteError); } + } catch (MongoTimeoutException e) { + log.error("saveJsons()", e); } - log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size()); + log.debug("saved text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size()); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java index 8a177cc7..51d3168e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java @@ -28,6 +28,8 @@ import org.apache.velocity.VelocityContext; import org.apache.velocity.app.Velocity;
import org.apache.velocity.runtime.RuntimeConstants;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.FileWriter;
@@ -59,6 +61,8 @@ import java.util.Map.Entry; @Getter
public class DruidSupervisorGenerator {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
Template template = null;
VelocityContext context;
@@ -90,7 +94,6 @@ public class DruidSupervisorGenerator { while (fields.hasNext()) {
Entry<String, JsonNode> field = fields.next();
- // System.out.println("--------"+field.getKey()+"--------");
printNode(prefix + "." + field.getKey(), field.getValue());
}
@@ -113,25 +116,13 @@ public class DruidSupervisorGenerator { private void printFlattenSpec(JsonNodeType type, String path) {
String name = path.substring(2).replace('.', ':');
// lets see what type the node is
- System.out.println("{");
- System.out.println("\"type\": \"path\",");
- System.out.println("\"name\": \"" + name + "\",");
- System.out.println("\"expr\": \"" + path + "\"");
- System.out.println("},");
+ log.info("{");
+ log.info("\"type\": \"path\",");
+ log.info("\"name\": \"" + name + "\",");
+ log.info("\"expr\": \"" + path + "\"");
+ log.info("},");
dimensions.add(new String[]{name, path});
- /*
- //for dimensionsSpec
- if (JsonNodeType.NUMBER.equals(type)) {
- System.out.println("{");
- System.out.println("\"type\": \"long\",");
- System.out.println("\"name\": \"" + name + "\",");
- System.out.println("},");
- } else {
- System.out.println("\"" + name + "\",");
-
- }
- */
}
public void doTopic(String topic) throws IOException {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/HttpClientUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/HttpClientUtil.java new file mode 100644 index 00000000..64b643ac --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/HttpClientUtil.java @@ -0,0 +1,122 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DCAE + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.util; + +import com.google.gson.Gson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * HttpClient + * + * @author guochunmeng + * + */ +public class HttpClientUtil { + + private static final Logger log = LoggerFactory.getLogger(HttpClientUtil.class); + + private static final String KIBANA = "Kibana"; + + private static final String KIBANA_DASHBOARD_IMPORT = "KibanaDashboardImport"; + + private static final String ELASTICSEARCH_MAPPING_TEMPLATE = "ElasticsearchMappingTemplate"; + + private HttpClientUtil() { + throw new IllegalStateException("Utility class"); + } + + public static boolean sendHttpClientPost(String url, String json, String postFlag, String urlFlag) { + boolean flag = false; + RestTemplate restTemplate = new RestTemplate(); + HttpHeaders headers = new HttpHeaders(); + if (urlFlag.equals(KIBANA)) { + log.info("urlFlag is Kibana, add header"); + headers.add("kbn-xsrf","true"); + } + headers.setContentType(MediaType.APPLICATION_JSON_UTF8); + HttpEntity<String> request = new HttpEntity<>(json, headers); + ResponseEntity<String> responseEntity = null; + try { + responseEntity = restTemplate.postForEntity(url, request, String.class); + if (responseEntity.getStatusCodeValue() != 200) + throw new RestClientException("Resquest failed"); + Gson gson = new Gson(); + Map<String, Object> map = new HashMap<>(); + map = gson.fromJson(responseEntity.getBody(), map.getClass()); + switch (postFlag) { + case KIBANA_DASHBOARD_IMPORT: + flag = flagOfKibanaDashboardImport(map); + break; + case ELASTICSEARCH_MAPPING_TEMPLATE : + flag = flagOfPostEsMappingTemplate(map); + break; + default: + break; + } + } catch (Exception e) { + log.debug("Resquest failed: " + e.getMessage()); + } + return flag; + } + + private static boolean flagOfKibanaDashboardImport(Map<String, Object> map) { + + boolean flag = true; + List objectsList = (List) map.get("objects"); + + if (!objectsList.isEmpty()) { + Map<String, Object> map2 = null; + for (int i = 0; i < objectsList.size(); i++){ + map2 = (Map<String, Object>)objectsList.get(i); + for(String key : map2.keySet()){ + if ("error".equals(key)) { + return false; + } + } + } + } + return flag; + } + + private static boolean flagOfPostEsMappingTemplate(Map<String, Object> map) { + + boolean flag = true; + for(String key : map.keySet()){ + if ("acknowledged".equals(key) && (boolean) map.get("acknowledged")) { + break; + } else { + flag = false; + } + } + return flag; + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java index db4dcfae..5c77d895 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java @@ -22,7 +22,6 @@ package org.onap.datalake.feeder.util; import java.util.HashMap; -import org.apache.commons.collections.CollectionUtils; import org.json.JSONArray; import org.json.JSONObject; |