diff options
7 files changed, 45 insertions, 22 deletions
diff --git a/datacollector/README.md b/datacollector/README.md index a6533a4..61bbae7 100644 --- a/datacollector/README.md +++ b/datacollector/README.md @@ -33,7 +33,8 @@ dmaap: prtocol: "http" host: "localhost" port: 8181 - measurements-topic: "measurements" + measurements-topics: + - "measurements" database: host: mariadb-host port: 3306 diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java index 379e32d..a3825f6 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java @@ -21,6 +21,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurementValues; import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements; import org.onap.rapp.datacollector.entity.ves.Event; @@ -61,20 +62,25 @@ public class VesRetrievalService implements DmaapRestReader { @Override public Collection<String> retrieveEvents() { - logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrl()); + logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrls()); + return config.getMeasurementsTopicUrls().stream().flatMap(this::retrieveEventsFromTopic) + .collect(Collectors.toList()); + } + + private Stream<String> retrieveEventsFromTopic(String topic) { try { - ResponseEntity<String[]> responseEntity = - restTemplate.exchange(config.getMeasurementsTopicUrl(), HttpMethod.GET, - new HttpEntity<String>(createHeaders(config.getDmaapProperties().getUsername(), config.getDmaapProperties().getPassword())), - String[].class); + ResponseEntity<String[]> responseEntity = restTemplate.exchange(topic, HttpMethod.GET, + new HttpEntity<String>(createHeaders(config.getDmaapProperties().getUsername(), + config.getDmaapProperties().getPassword())), String[].class); if (responseEntity.hasBody()) { String[] events = responseEntity.getBody(); - return Arrays.stream(events).collect(Collectors.toList()); + return Arrays.stream(events); } } catch (RestClientException ex) { logger.error("Failed to reach to dmaap", ex); } - return Collections.emptyList(); + + return Arrays.stream(new String[0]); } private HttpHeaders createHeaders(String username, String password) { diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java index 3560e89..a664e8e 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java @@ -13,6 +13,9 @@ package org.onap.rapp.datacollector.service.configuration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import org.springframework.boot.context.properties.ConfigurationProperties; import lombok.Getter; @@ -28,10 +31,11 @@ public class DmaapProperties { private String username; private String password; private int port; - private String measurementsTopic; + private List<String> measurementsTopics = new ArrayList<>(); - public String getMeasurementsTopicUrl() { - return String.format("%s://%s:%d/%s", protocol, host, port, measurementsTopic); + public List<String> getMeasurementsTopicUrls() { + return measurementsTopics.stream().map(topic -> String.format("%s://%s:%d/%s", protocol, host, port, topic)) + .collect(Collectors.toList()); } } diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java index ab1bd0a..a3b2536 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java @@ -23,6 +23,7 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; +import java.util.List; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; @@ -118,8 +119,8 @@ public class DmaapRestReaderConfiguration { this.databaseProperties = databaseProperties; } - public String getMeasurementsTopicUrl() { - return dmaapProperties.getMeasurementsTopicUrl(); + public List<String> getMeasurementsTopicUrls() { + return dmaapProperties.getMeasurementsTopicUrls(); } public DmaapProperties getDmaapProperties() { diff --git a/datacollector/src/main/resources/application.yml b/datacollector/src/main/resources/application.yml index 502d8de..01919fa 100644 --- a/datacollector/src/main/resources/application.yml +++ b/datacollector/src/main/resources/application.yml @@ -6,7 +6,9 @@ dmaap: port: 8181 username: dcae@dcae.onap.org password: demo123456! - measurements-topic: "measurements" + measurements-topics: + - measurements + - measurements2 database: host: mariadb-host port: 3306 diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java index 5a593f8..8a28358 100644 --- a/datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java +++ b/datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java @@ -16,6 +16,7 @@ package org.onap.rapp.datacollector.service; import static org.junit.Assert.assertEquals; +import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.onap.rapp.datacollector.service.configuration.DatabaseProperties; @@ -35,7 +36,7 @@ import org.springframework.test.context.junit4.SpringRunner; @TestPropertySource(properties = {"dmaap.host=localhost", "dmaap.protocol=http", "dmaap.port=8080", - "dmaap.measurements-topic=a-topic", + "dmaap.measurements-topics=a-topic,b-topic", "database.url=jdbc:mysql://172.17.0.2:3306/ves?createDatabaseIfNotExist=true", "database.username=root", "database.password=mypass", @@ -49,9 +50,12 @@ public class DmaapRestReaderConfigurationTest { @Test public void testUrlConstruction() { - final String actual = config.getMeasurementsTopicUrl(); - final String expected = "http://localhost:8080/a-topic"; + final List<String> actual = config.getMeasurementsTopicUrls(); + final String expected1 = "http://localhost:8080/a-topic"; + final String expected2 = "http://localhost:8080/b-topic"; - assertEquals(expected, actual); + assertEquals(2, actual.size()); + assertEquals(expected1, actual.get(0)); + assertEquals(expected2, actual.get(1)); } } diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java index 206fc2f..ebb2cbc 100644 --- a/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java +++ b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java @@ -17,6 +17,7 @@ package org.onap.rapp.datacollector.service; import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -54,18 +55,18 @@ class VesRetrievalServiceTest { @Mock UEHolder ueHolder; - private static final String TOPIC_URL = "http://localhost/a-topic"; + private static final List<String> TOPIC_URLS = Collections.singletonList("http://localhost/a-topic"); private VesRetrievalService service; @BeforeEach public void init() { MockitoAnnotations.initMocks(this); - Mockito.when(config.getMeasurementsTopicUrl()).thenReturn(TOPIC_URL); + Mockito.when(config.getMeasurementsTopicUrls()).thenReturn(TOPIC_URLS); Mockito.when(config.getDmaapProperties()).thenReturn(getTestProperties()); String[] response = new String[]{"a", "b"}; - Mockito.when(restTemplate.exchange(TOPIC_URL, HttpMethod.GET, new HttpEntity<>(createTestHeaders()), String[].class)) + Mockito.when(restTemplate.exchange(getTestTopicUrl(), HttpMethod.GET, new HttpEntity<>(createTestHeaders()), String[].class)) .thenReturn(new ResponseEntity<>(response, HttpStatus.OK)); service = new VesRetrievalService(restTemplate, parser, persister, config, ueHolder); @@ -80,7 +81,7 @@ class VesRetrievalServiceTest { @Test void whenGetIsCalled_thenExceptionIsThrown() { - Mockito.when(restTemplate.exchange(TOPIC_URL, HttpMethod.GET, new HttpEntity<>(createTestHeaders()), String[].class)) + Mockito.when(restTemplate.exchange(getTestTopicUrl(), HttpMethod.GET, new HttpEntity<>(createTestHeaders()), String[].class)) .thenThrow(new RestClientException("An test exception")); service = new VesRetrievalService(restTemplate, parser, persister, config, ueHolder); @@ -125,5 +126,9 @@ class VesRetrievalServiceTest { headers.setBasicAuth(getTestProperties().getUsername(), getTestProperties().getPassword()); return headers; } + + private String getTestTopicUrl() { + return TOPIC_URLS.get(0); + } } |