From 027c75f93a010cd115fe5fa882646d6f258b566b Mon Sep 17 00:00:00 2001 From: ZhangZihao Date: Thu, 4 Jul 2019 10:33:32 +0800 Subject: kafka Change-Id: I3084858f2ddc06c42e062f65d65b4d3dec620fd7 Issue-ID: DCAEGEN2-1631 Signed-off-by: ZhangZihao --- .../feeder/controller/KafkaController.java | 149 +++++++++++++++++++++ .../org/onap/datalake/feeder/domain/Kafka.java | 22 +++ .../org/onap/datalake/feeder/domain/Topic.java | 8 ++ .../org/onap/datalake/feeder/dto/KafkaConfig.java | 66 +++++++++ .../org/onap/datalake/feeder/dto/TopicConfig.java | 1 + .../onap/datalake/feeder/service/KafkaService.java | 87 ++++++++++++ .../onap/datalake/feeder/service/TopicService.java | 21 +++ 7 files changed, 354 insertions(+) create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java (limited to 'components/datalake-handler/feeder') diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java new file mode 100644 index 00000000..901adf29 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java @@ -0,0 +1,149 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.controller; + +import io.swagger.annotations.ApiOperation; +import org.onap.datalake.feeder.controller.domain.PostReturnBody; +import org.onap.datalake.feeder.domain.Kafka; +import org.onap.datalake.feeder.dto.KafkaConfig; +import org.onap.datalake.feeder.repository.KafkaRepository; +import org.onap.datalake.feeder.service.KafkaService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.validation.BindingResult; +import org.springframework.web.bind.annotation.*; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.List; + +/** + * This controller manages kafka settings + * + * @author guochunmeng + */ +@RestController +@RequestMapping(value = "/kafkas", produces = { MediaType.APPLICATION_JSON_VALUE }) +public class KafkaController { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private KafkaService kafkaService; + + @Autowired + private KafkaRepository kafkaRepository; + + @PostMapping("") + @ResponseBody + @ApiOperation(value="Create a kafka.") + public PostReturnBody createKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, HttpServletResponse response) throws IOException { + + if (result.hasErrors()) { + sendError(response, 400, "Error parsing KafkaConfig : "+result.toString()); + return null; + } + + Kafka oldKafka = kafkaService.getKafkaById(kafkaConfig.getId()); + + if (oldKafka != null) { + sendError(response, 400, "kafka is exist "+kafkaConfig.getId()); + return null; + } else { + Kafka kafka = null; + try { + kafka = kafkaService.fillKafkaConfiguration(kafkaConfig); + } catch (Exception e) { + log.debug("FillKafkaConfiguration failed", e.getMessage()); + sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage()); + return null; + } + kafkaRepository.save(kafka); + log.info("Kafka save successed"); + return mkPostReturnBody(200, kafka); + } + } + + @PutMapping("/{id}") + @ResponseBody + @ApiOperation(value="Update a kafka.") + public PostReturnBody updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable String id, HttpServletResponse response) throws IOException { + + if (result.hasErrors()) { + sendError(response, 400, "Error parsing KafkaConfig : "+result.toString()); + return null; + } + + Kafka oldKafka = kafkaService.getKafkaById(id); + + if (oldKafka == null) { + sendError(response, 400, "Kafka not found: "+id); + return null; + } else { + try { + kafkaService.fillKafkaConfiguration(kafkaConfig, oldKafka); + } catch (Exception e) { + log.debug("FillKafkaConfiguration failed", e.getMessage()); + sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage()); + return null; + } + kafkaRepository.save(oldKafka); + log.info("kafka update successed"); + return mkPostReturnBody(200, oldKafka); + } + } + + @DeleteMapping("/{id}") + @ResponseBody + @ApiOperation(value="delete a kafka.") + public void deleteKafka(@PathVariable("id") String id, HttpServletResponse response) throws IOException{ + + Kafka oldKafka = kafkaService.getKafkaById(id); + if (oldKafka == null) { + sendError(response, 400, "kafka not found "+id); + } else { + kafkaRepository.delete(oldKafka); + response.setStatus(204); + } + } + + @GetMapping("") + @ResponseBody + @ApiOperation(value="List all Kafkas") + public List queryAllKafka(){ + return kafkaService.getAllKafka(); + } + + private PostReturnBody mkPostReturnBody(int statusCode, Kafka kafka) { + PostReturnBody retBody = new PostReturnBody<>(); + retBody.setStatusCode(statusCode); + retBody.setReturnBody(kafka.getKafkaConfig()); + return retBody; + } + + private void sendError(HttpServletResponse response, int sc, String msg) throws IOException { + log.info(msg); + response.sendError(sc, msg); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java index 26be942a..2741c638 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java @@ -32,6 +32,7 @@ import javax.persistence.Table; import com.fasterxml.jackson.annotation.JsonBackReference; import lombok.Getter; import lombok.Setter; +import org.onap.datalake.feeder.dto.KafkaConfig; /** @@ -123,4 +124,25 @@ public class Kafka { public int hashCode() { return id.hashCode(); } + + public KafkaConfig getKafkaConfig() { + KafkaConfig kafkaConfig = new KafkaConfig(); + + kafkaConfig.setId(getId()); + kafkaConfig.setBrokerList(getBrokerList()); + kafkaConfig.setConsumerCount(getConsumerCount()); + kafkaConfig.setEnabled(isEnabled()); + kafkaConfig.setExcludedTopic(getExcludedTopic()); + kafkaConfig.setGroup(getGroup()); + kafkaConfig.setIncludedTopic(getIncludedTopic()); + kafkaConfig.setLogin(getLogin()); + kafkaConfig.setName(getName()); + kafkaConfig.setPass(getPass()); + kafkaConfig.setSecure(isSecure()); + kafkaConfig.setSecurityProtocol(getSecurityProtocol()); + kafkaConfig.setTimeout(getTimeout()); + kafkaConfig.setZooKeeper(getZooKeeper()); + + return kafkaConfig; + } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index a27b6756..c680e71b 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -251,6 +251,14 @@ public class Topic { tConfig.setSinkdbs(dbList); tConfig.setEnabledSinkdbs(enabledDbList); + Set topicKafka = getKafkas(); + List kafkaList = new ArrayList<>(); + if (topicKafka != null) { + for (Kafka kafka : topicKafka) { + kafkaList.add(kafka.getId()); + } + } + tConfig.setSinkKafkas(kafkaList); return tConfig; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java new file mode 100644 index 00000000..b158d167 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 QCT + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.dto; + +import lombok.Getter; +import lombok.Setter; + +import java.util.List; + +/** + * JSON request body for Kafka Config. + * + * @author guochunmeng + * + */ +@Getter +@Setter +public class KafkaConfig { + + private String id; + + private String name; + + private boolean enabled; + + private String brokerList; + + private String zooKeeper; + + private String group; + + private boolean secure; + + private String login; + + private String pass; + + private String securityProtocol; + + private String includedTopic; + + private String excludedTopic; + + private Integer consumerCount; + + private Integer timeout; + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index ace7bfa9..942526d2 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -55,6 +55,7 @@ public class TopicConfig { private String messageIdPath; private String aggregateArrayPath; private String flattenArrayPath; + private List sinkKafkas; @Override public String toString() { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java new file mode 100644 index 00000000..58ee9087 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java @@ -0,0 +1,87 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.datalake.feeder.service; + +import org.onap.datalake.feeder.domain.Kafka; +import org.onap.datalake.feeder.dto.KafkaConfig; +import org.onap.datalake.feeder.repository.KafkaRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.*; + +/** + * Service for kafkas + * + * @author guochunmeng + */ +@Service +public class KafkaService { + + @Autowired + private KafkaRepository kafkaRepository; + + public Kafka getKafkaById(String id) { + + Optional ret = kafkaRepository.findById(id); + return ret.isPresent() ? ret.get() : null; + } + + public List getAllKafka() { + + List kafkaConfigList = new ArrayList<>(); + Iterable kafkaIterable = kafkaRepository.findAll(); + for(Kafka portal : kafkaIterable) { + kafkaConfigList.add(portal.getKafkaConfig()); + } + return kafkaConfigList; + } + + public Kafka fillKafkaConfiguration(KafkaConfig kafkaConfig) { + Kafka kafka = new Kafka(); + fillKafka(kafkaConfig, kafka); + return kafka; + } + + public void fillKafkaConfiguration(KafkaConfig kafkaConfig, Kafka kafka) { + fillKafka(kafkaConfig, kafka); + } + + private void fillKafka(KafkaConfig kafkaConfig, Kafka kafka) { + + kafka.setId(kafkaConfig.getId()); + kafka.setBrokerList(kafkaConfig.getBrokerList()); + kafka.setConsumerCount(kafkaConfig.getConsumerCount()); + kafka.setEnabled(kafkaConfig.isEnabled()); + kafka.setExcludedTopic(kafkaConfig.getExcludedTopic()); + kafka.setIncludedTopic(kafkaConfig.getIncludedTopic()); + kafka.setGroup(kafkaConfig.getGroup()); + kafka.setLogin(kafkaConfig.getLogin()); + kafka.setName(kafkaConfig.getName()); + kafka.setPass(kafkaConfig.getPass()); + kafka.setSecure(kafkaConfig.isSecure()); + kafka.setSecurityProtocol(kafkaConfig.getSecurityProtocol()); + kafka.setTimeout(kafkaConfig.getTimeout()); + kafka.setZooKeeper(kafkaConfig.getZooKeeper()); + + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index 645160e2..73f6293f 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -35,6 +35,7 @@ import org.onap.datalake.feeder.domain.EffectiveTopic; import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.repository.DbRepository; +import org.onap.datalake.feeder.repository.KafkaRepository; import org.onap.datalake.feeder.repository.TopicNameRepository; import org.onap.datalake.feeder.repository.TopicRepository; import org.onap.datalake.feeder.service.db.ElasticsearchService; @@ -68,6 +69,9 @@ public class TopicService { @Autowired private DbService dbService; + + @Autowired + private KafkaRepository kafkaRepository; public List getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException { @@ -166,6 +170,23 @@ public class TopicService { } else { topic.setDbs(relateDb); } + + Set relateKafka = new HashSet<>(); + if (tConfig.getSinkKafkas() != null) { + for (String item : tConfig.getSinkKafkas()) { + Optional sinkKafka = kafkaRepository.findById(item); + if (sinkKafka.isPresent()) { + relateKafka.add(sinkKafka.get()); + } + } + if (relateKafka.size() > 0) { + topic.setKafkas(relateKafka); + } else if (relateKafka.size() == 0) { + topic.getKafkas().clear(); + } + } else { + topic.setKafkas(relateKafka); + } } } -- cgit 1.2.3-korg