diff options
Diffstat (limited to 'components')
-rw-r--r-- | components/datalake-handler/feeder/Dockerfile (renamed from components/datalake-handler/feeder/src/assembly/Dockerfile) | 4 | ||||
-rw-r--r-- | components/datalake-handler/feeder/pom.xml | 68 | ||||
-rw-r--r-- | components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java | 13 | ||||
-rw-r--r-- | components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicNameService.java | 59 | ||||
-rw-r--r-- | components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java | 2 | ||||
-rwxr-xr-x | components/pm-subscription-handler/pmsh_service/mod/config_handler.py | 78 | ||||
-rwxr-xr-x | components/pm-subscription-handler/pmsh_service/pmsh_service.py | 2 | ||||
-rw-r--r-- | components/pm-subscription-handler/setup.py | 3 | ||||
-rwxr-xr-x | components/pm-subscription-handler/tests/config_handler_test.py | 112 | ||||
-rwxr-xr-x | components/pm-subscription-handler/tests/expected_config.json | 68 | ||||
-rw-r--r-- | components/pm-subscription-handler/tox.ini | 2 |
11 files changed, 400 insertions, 11 deletions
diff --git a/components/datalake-handler/feeder/src/assembly/Dockerfile b/components/datalake-handler/feeder/Dockerfile index 7cd6380c..20444f97 100644 --- a/components/datalake-handler/feeder/src/assembly/Dockerfile +++ b/components/datalake-handler/feeder/Dockerfile @@ -1,4 +1,4 @@ -FROM openjdk:8-jre +FROM openjdk:8-jre-slim MAINTAINER Guobiao Mo <guobiaomo@chinamobile.com> @@ -12,7 +12,7 @@ USER datalake WORKDIR /home/datalake #add the fat jar -COPY target/feeder-1.0.0-SNAPSHOT.jar /home/datalake/ +COPY target/${JAR_FILE} /home/datalake/ COPY src/assembly/run.sh /home/datalake/ CMD ["sh", "run.sh"] diff --git a/components/datalake-handler/feeder/pom.xml b/components/datalake-handler/feeder/pom.xml index 560e1e71..67f62a0f 100644 --- a/components/datalake-handler/feeder/pom.xml +++ b/components/datalake-handler/feeder/pom.xml @@ -1,6 +1,5 @@ <?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -15,108 +14,146 @@ <packaging>jar</packaging> <name>DataLake Feeder</name> + <properties> + <swagger.version>2.9.2</swagger.version> + <dockerfile-maven.version>1.4.13</dockerfile-maven.version> + <docker.image.path>onap/org.onap.dcaegen2.services.datalakefeeder</docker.image.path> + </properties> <dependencies> <dependency> + <groupId>org.jdom</groupId> + <artifactId>jdom2</artifactId> + <version>2.0.6</version> + </dependency> + + <dependency> + <groupId>com.facebook.presto</groupId> + <artifactId>presto-jdbc</artifactId> + <version>0.229</version> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.mariadb.jdbc</groupId> <artifactId>mariadb-java-client</artifactId> + <version>2.4.1</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> + <version>20190722</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> + <version>4.5.10</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> + <version>2.3.1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> + <version>${springboot.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> + <version>${springboot.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> + <version>${springboot.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-couchbase</artifactId> + <version>${springboot.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> + <version>${springboot.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> + <version>${springboot.version}</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> + <version>${elasticsearchjava.version}</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> + <version>2.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-yaml</artifactId> + <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> + <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> + <version>2.8.2</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> + <version>1.18.10</version> + <scope>provided</scope> </dependency> <dependency> <groupId>io.druid</groupId> <artifactId>tranquility-core_2.11</artifactId> + <version>0.8.3</version> </dependency> <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity-engine-core</artifactId> + <version>2.1</version> </dependency> @@ -136,26 +173,27 @@ <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> - <version>6.0.10.Final</version> + <version>6.1.0.Final</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> - <version>2.9.2</version> + <version>${swagger.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> - <version>2.9.2</version> + <version>${swagger.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> + <version>${mongojava.version}</version> </dependency> <dependency> <groupId>com.couchbase.mock</groupId> @@ -165,4 +203,24 @@ </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>com.spotify</groupId> + <artifactId>dockerfile-maven-plugin</artifactId> + <version>${dockerfile-maven.version}</version> + <configuration> + <!-- repository>docker.io/moguobiao/datalake-feeder-maven</repository--> + <repository>${onap.nexus.dockerregistry.daily}/${docker.image.path}</repository> + <tag>${project.version}</tag> + <dockerfile>Dockerfile</dockerfile> + <!-- useMavenSettingsForAuth>true</useMavenSettingsForAuth--> + <buildArgs> + <JAR_FILE>${project.build.finalName}.jar</JAR_FILE> + </buildArgs> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java index a02cd6a2..3bdbcdba 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java @@ -23,6 +23,7 @@ package org.onap.datalake.feeder.service; import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -60,6 +61,9 @@ public class TopicConfigPollingService implements Runnable { @Autowired private KafkaRepository kafkaRepository; + @Autowired + private TopicNameService topicNameService; + //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic. private Map<Integer, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>(); //private Map<String, TopicConfig> effectiveTopicConfigMap; @@ -114,7 +118,7 @@ public class TopicConfigPollingService implements Runnable { log.info("TopicConfigPollingService started."); while (active) { - try { //sleep first since we already pool in init() + try { //sleep first since we already called poll() in init() Thread.sleep(config.getCheckTopicInterval()); if(!active) { break; @@ -138,6 +142,7 @@ public class TopicConfigPollingService implements Runnable { log.info("activeTopics list is updated, new={}", newTopics); activeTopicMap.put(kafkaId, newTopics); + //update version currentActiveTopicsVersionMap.put(kafkaId, currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1)+1); } else { log.debug("activeTopics list is not updated."); @@ -156,14 +161,20 @@ public class TopicConfigPollingService implements Runnable { } private Map<Integer, Set<String>> poll() throws IOException { + Set<String> allTopicNames = new HashSet<>(); + Map<Integer, Set<String>> ret = new HashMap<>(); Iterable<Kafka> kafkas = kafkaRepository.findAll(); for (Kafka kafka : kafkas) { if (kafka.isEnabled()) { Set<String> topics = poll(kafka); ret.put(kafka.getId(), topics); + allTopicNames.addAll(topics); } } + + topicNameService.update(allTopicNames); + return ret; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicNameService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicNameService.java new file mode 100644 index 00000000..021d2c94 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicNameService.java @@ -0,0 +1,59 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.collections.CollectionUtils; +import org.onap.datalake.feeder.domain.TopicName; +import org.onap.datalake.feeder.repository.TopicNameRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Service for TopicName + * + * @author Guobiao Mo + * + */ +@Service +public class TopicNameService { + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private TopicNameRepository topicNameRepository; + + public void update(Collection<String> allTopicNames) { + + List<TopicName> all = allTopicNames.stream().map(s-> new TopicName(s)).collect(Collectors.toList()); + List<TopicName> allInDb = (List<TopicName>) topicNameRepository.findAll(); + + Collection<TopicName> additions = CollectionUtils.subtract(all, allInDb); + + if(!additions.isEmpty()) + topicNameRepository.saveAll(additions); + + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index 3b1c2ccf..c26d9802 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -102,6 +102,8 @@ public class TopicService { return ret; } + // for unique topic string, one can create multiple 'topic' in admin UI. + // for example, one 'topic' setting correlates events, and sends data to ES, another 'topic' sends data to HDFS without such setting //TODO use query public List<Topic> findTopics(Kafka kafka, String topicStr) { List<Topic> ret = new ArrayList<>(); diff --git a/components/pm-subscription-handler/pmsh_service/mod/config_handler.py b/components/pm-subscription-handler/pmsh_service/mod/config_handler.py new file mode 100755 index 00000000..e9edbca4 --- /dev/null +++ b/components/pm-subscription-handler/pmsh_service/mod/config_handler.py @@ -0,0 +1,78 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2019-2020 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== + +import json +from os import environ + +import requests +from tenacity import retry, wait_fixed, stop_after_attempt + +from pmsh_service.mod import pmsh_logging as logger + + +class ConfigHandler: + """ Handles retrieval of PMSH's configuration from Configbinding service.""" + + def __init__(self): + self.cbs_url = f'http://{self.cbs_hostname}:{str(self.cbs_port)}/' \ + f'service_component_all/{self.hostname}' + self._config = None + + @property + def cbs_hostname(self): + return _get_environment_variable('CONFIG_BINDING_SERVICE_SERVICE_HOST') + + @property + def cbs_port(self): + return _get_environment_variable('CONFIG_BINDING_SERVICE_SERVICE_PORT') + + @property + def hostname(self): + return _get_environment_variable('HOSTNAME') + + @retry(wait=wait_fixed(2), stop=stop_after_attempt(5)) + def get_config(self): + """ Retrieves PMSH's configuration from Configbinding service. If a non-2xx response + is received, it retries after 2 seconds for 5 times before raising an exception. + + Returns: + dict: Dictionary representation of the the service configuration + + Raises: + Exception: If any error occurred pulling configuration from Configbinding service. + """ + if self._config is None: + logger.debug('No configuration found, pulling from Configbinding Service.') + try: + response = requests.get(self.cbs_url) + response.raise_for_status() + self._config = response.json() + logger.debug(f'PMSH Configuration from Configbinding Service: {self._config}') + return json.loads(self._config) + except Exception as err: + raise Exception(f'Error retrieving configuration from CBS: {err}') + else: + return self._config + + +def _get_environment_variable(env_var_key): + try: + env_var = environ[env_var_key] + except KeyError as error: + raise KeyError(f'Environment variable {env_var_key} must be set. {error}') + return env_var diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service.py b/components/pm-subscription-handler/pmsh_service/pmsh_service.py index 6764c75f..8832f570 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2019 Nordix Foundation. +# Copyright (C) 2019-2020 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/components/pm-subscription-handler/setup.py b/components/pm-subscription-handler/setup.py index 9012a04c..a4d9ada4 100644 --- a/components/pm-subscription-handler/setup.py +++ b/components/pm-subscription-handler/setup.py @@ -30,6 +30,5 @@ setup( python_requires='>=3', install_requires=[ "requests==2.22.0", - "aiohttp==3.6.2", - "onappylog==1.0.9"], + "tenacity==6.0.0"], ) diff --git a/components/pm-subscription-handler/tests/config_handler_test.py b/components/pm-subscription-handler/tests/config_handler_test.py new file mode 100755 index 00000000..fcc25d60 --- /dev/null +++ b/components/pm-subscription-handler/tests/config_handler_test.py @@ -0,0 +1,112 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2019-2020 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== + +import json +import unittest +from os import environ +from os import path +from unittest.mock import patch + +import requests +import responses +from tenacity import wait_none + +from pmsh_service.mod.config_handler import ConfigHandler + + +class ConfigHandlerTest(unittest.TestCase): + + def setUp(self): + self.env_vars = {'CONFIG_BINDING_SERVICE_SERVICE_HOST': 'cbs_hostname', + 'CONFIG_BINDING_SERVICE_SERVICE_PORT': '10000', + 'HOSTNAME': 'hostname'} + for key, value in self.env_vars.items(): + environ[key] = value + self.cbs_url = 'http://cbs_hostname:10000/service_component_all/hostname' + self.expected_config = self._get_expected_config() + + def test_missing_environment_variable(self): + for key, value in self.env_vars.items(): + with self.assertRaises(KeyError): + environ.pop(key) + test_value = globals()[value] + test_value() + environ[key] = value + + @responses.activate + def test_get_config_success(self): + responses.add(responses.GET, self.cbs_url, json=json.dumps(self.expected_config), + status=200) + + config_handler = ConfigHandler() + config_handler.get_config.retry.wait = wait_none() + + self.assertEqual(self.expected_config, config_handler.get_config()) + + def test_get_config_already_exists(self): + config_handler = ConfigHandler() + expected_config = self._get_expected_config() + config_handler._config = expected_config + + with patch.object(requests, 'get') as mock_get_request: + actual_config = config_handler.get_config() + + self.assertEqual(0, mock_get_request.call_count) + self.assertEqual(expected_config, actual_config) + + @responses.activate + def test_get_config_error(self): + responses.add(responses.GET, self.cbs_url, status=404) + config_handler = ConfigHandler() + config_handler.get_config.retry.wait = wait_none() + + with self.assertRaises(Exception): + config_handler.get_config() + + @responses.activate + def test_get_config_max_retries_error(self): + retry_limit = 5 + config_handler = ConfigHandler() + config_handler.get_config.retry.wait = wait_none() + + for __ in range(retry_limit): + responses.add(responses.GET, self.cbs_url, status=500) + + with self.assertRaises(Exception): + config_handler.get_config() + self.assertEqual(retry_limit, len(responses.calls)) + + @responses.activate + def test_get_config_less_than_5_retries_success(self): + retry_attempts = 4 + responses.add(responses.GET, self.cbs_url, status=500) + responses.add(responses.GET, self.cbs_url, status=400) + responses.add(responses.GET, self.cbs_url, status=300) + responses.add(responses.GET, self.cbs_url, json=json.dumps(self.expected_config), + status=200) + + config_handler = ConfigHandler() + config_handler.get_config.retry.wait = wait_none() + config_handler.get_config() + + self.assertEqual(retry_attempts, len(responses.calls)) + + @staticmethod + def _get_expected_config(): + with open(path.join(path.dirname(__file__), 'expected_config.json'))as json_file: + return json.load(json_file) diff --git a/components/pm-subscription-handler/tests/expected_config.json b/components/pm-subscription-handler/tests/expected_config.json new file mode 100755 index 00000000..43f67e88 --- /dev/null +++ b/components/pm-subscription-handler/tests/expected_config.json @@ -0,0 +1,68 @@ +{ + "config":{}, + "policy": { + "subscription": { + "subscriptionName": "someExtraPM-AllKista-gNB-R2B", + "administrativeState": "UNLOCKED", + "fileBasedGP": 15, + "fileLocation": "c:\/\/PM", + "nfTypeModelInvariantId": "2829292", + "nfFilter": { + "swVersions": [ + "A21", + "B" + ], + "nfNames": [ + "ABC", + "DEF", + "foo.*" + ] + }, + "measurementGroups": [ + { + "measurementGroup": { + "measurementTypes": [ + { + "measurementType": "countera" + }, + { + "measurementType": "counterb" + } + ], + "managedObjectDNsBasic": [ + { + "DN": "dna" + }, + { + "DN": "dnb" + } + ] + } + }, + { + "measurementGroup": { + "measurementTypes": [ + { + "measurementType": "counterc" + }, + { + "measurementType": "counterd" + } + ], + "managedObjectDNsBasic": [ + { + "DN": "dnc" + }, + { + "DN": "dnd" + } + ] + } + } + ] + } +} + + + +}
\ No newline at end of file diff --git a/components/pm-subscription-handler/tox.ini b/components/pm-subscription-handler/tox.ini index fcbb647c..523338d1 100644 --- a/components/pm-subscription-handler/tox.ini +++ b/components/pm-subscription-handler/tox.ini @@ -26,6 +26,7 @@ deps= pytest coverage pytest-cov + responses==0.10.7 setenv = PYTHONPATH={toxinidir}/pmsh_service:{toxinidir}/pmsh_service/mod:{toxinidir}/tests commands= @@ -39,3 +40,4 @@ commands = flake8 pmsh_service tests [flake8] max-line-length=100 +ignore = E999 |