summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authork.kedron <k.kedron@partner.samsung.com>2021-08-17 13:54:29 +0200
committerPawel Slowikowski <p.slowikows2@samsung.com>2021-08-31 08:57:17 +0000
commite26f1224fb91c1ca49f4b4b37a4a5de06ba3a7ad (patch)
tree12cd3614320a00dfb676449fe0599077315424b6
parent007b856430d1dbc11e21192913326071d3f3055f (diff)
Many topics support
Issue-ID: INT-1947 Signed-off-by: Krystian Kedron <k.kedron@partner.samsung.com> Change-Id: I1458c853ea7fabe8b393ec6f7e7ce8c5d13953f6
-rw-r--r--datacollector/README.md3
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java20
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java10
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java5
-rw-r--r--datacollector/src/main/resources/application.yml4
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java12
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java13
7 files changed, 45 insertions, 22 deletions
diff --git a/datacollector/README.md b/datacollector/README.md
index a6533a4..61bbae7 100644
--- a/datacollector/README.md
+++ b/datacollector/README.md
@@ -33,7 +33,8 @@ dmaap:
prtocol: "http"
host: "localhost"
port: 8181
- measurements-topic: "measurements"
+ measurements-topics:
+ - "measurements"
database:
host: mariadb-host
port: 3306
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
index 379e32d..a3825f6 100644
--- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
@@ -21,6 +21,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurementValues;
import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
import org.onap.rapp.datacollector.entity.ves.Event;
@@ -61,20 +62,25 @@ public class VesRetrievalService implements DmaapRestReader {
@Override
public Collection<String> retrieveEvents() {
- logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrl());
+ logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrls());
+ return config.getMeasurementsTopicUrls().stream().flatMap(this::retrieveEventsFromTopic)
+ .collect(Collectors.toList());
+ }
+
+ private Stream<String> retrieveEventsFromTopic(String topic) {
try {
- ResponseEntity<String[]> responseEntity =
- restTemplate.exchange(config.getMeasurementsTopicUrl(), HttpMethod.GET,
- new HttpEntity<String>(createHeaders(config.getDmaapProperties().getUsername(), config.getDmaapProperties().getPassword())),
- String[].class);
+ ResponseEntity<String[]> responseEntity = restTemplate.exchange(topic, HttpMethod.GET,
+ new HttpEntity<String>(createHeaders(config.getDmaapProperties().getUsername(),
+ config.getDmaapProperties().getPassword())), String[].class);
if (responseEntity.hasBody()) {
String[] events = responseEntity.getBody();
- return Arrays.stream(events).collect(Collectors.toList());
+ return Arrays.stream(events);
}
} catch (RestClientException ex) {
logger.error("Failed to reach to dmaap", ex);
}
- return Collections.emptyList();
+
+ return Arrays.stream(new String[0]);
}
private HttpHeaders createHeaders(String username, String password) {
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java
index 3560e89..a664e8e 100644
--- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java
@@ -13,6 +13,9 @@
package org.onap.rapp.datacollector.service.configuration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
import org.springframework.boot.context.properties.ConfigurationProperties;
import lombok.Getter;
@@ -28,10 +31,11 @@ public class DmaapProperties {
private String username;
private String password;
private int port;
- private String measurementsTopic;
+ private List<String> measurementsTopics = new ArrayList<>();
- public String getMeasurementsTopicUrl() {
- return String.format("%s://%s:%d/%s", protocol, host, port, measurementsTopic);
+ public List<String> getMeasurementsTopicUrls() {
+ return measurementsTopics.stream().map(topic -> String.format("%s://%s:%d/%s", protocol, host, port, topic))
+ .collect(Collectors.toList());
}
}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java
index ab1bd0a..a3b2536 100644
--- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java
@@ -23,6 +23,7 @@ import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
+import java.util.List;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
@@ -118,8 +119,8 @@ public class DmaapRestReaderConfiguration {
this.databaseProperties = databaseProperties;
}
- public String getMeasurementsTopicUrl() {
- return dmaapProperties.getMeasurementsTopicUrl();
+ public List<String> getMeasurementsTopicUrls() {
+ return dmaapProperties.getMeasurementsTopicUrls();
}
public DmaapProperties getDmaapProperties() {
diff --git a/datacollector/src/main/resources/application.yml b/datacollector/src/main/resources/application.yml
index 502d8de..01919fa 100644
--- a/datacollector/src/main/resources/application.yml
+++ b/datacollector/src/main/resources/application.yml
@@ -6,7 +6,9 @@ dmaap:
port: 8181
username: dcae@dcae.onap.org
password: demo123456!
- measurements-topic: "measurements"
+ measurements-topics:
+ - measurements
+ - measurements2
database:
host: mariadb-host
port: 3306
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java
index 5a593f8..8a28358 100644
--- a/datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java
@@ -16,6 +16,7 @@ package org.onap.rapp.datacollector.service;
import static org.junit.Assert.assertEquals;
+import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.onap.rapp.datacollector.service.configuration.DatabaseProperties;
@@ -35,7 +36,7 @@ import org.springframework.test.context.junit4.SpringRunner;
@TestPropertySource(properties = {"dmaap.host=localhost",
"dmaap.protocol=http",
"dmaap.port=8080",
- "dmaap.measurements-topic=a-topic",
+ "dmaap.measurements-topics=a-topic,b-topic",
"database.url=jdbc:mysql://172.17.0.2:3306/ves?createDatabaseIfNotExist=true",
"database.username=root",
"database.password=mypass",
@@ -49,9 +50,12 @@ public class DmaapRestReaderConfigurationTest {
@Test
public void testUrlConstruction() {
- final String actual = config.getMeasurementsTopicUrl();
- final String expected = "http://localhost:8080/a-topic";
+ final List<String> actual = config.getMeasurementsTopicUrls();
+ final String expected1 = "http://localhost:8080/a-topic";
+ final String expected2 = "http://localhost:8080/b-topic";
- assertEquals(expected, actual);
+ assertEquals(2, actual.size());
+ assertEquals(expected1, actual.get(0));
+ assertEquals(expected2, actual.get(1));
}
}
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java
index 206fc2f..ebb2cbc 100644
--- a/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java
@@ -17,6 +17,7 @@ package org.onap.rapp.datacollector.service;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -54,18 +55,18 @@ class VesRetrievalServiceTest {
@Mock
UEHolder ueHolder;
- private static final String TOPIC_URL = "http://localhost/a-topic";
+ private static final List<String> TOPIC_URLS = Collections.singletonList("http://localhost/a-topic");
private VesRetrievalService service;
@BeforeEach
public void init() {
MockitoAnnotations.initMocks(this);
- Mockito.when(config.getMeasurementsTopicUrl()).thenReturn(TOPIC_URL);
+ Mockito.when(config.getMeasurementsTopicUrls()).thenReturn(TOPIC_URLS);
Mockito.when(config.getDmaapProperties()).thenReturn(getTestProperties());
String[] response = new String[]{"a", "b"};
- Mockito.when(restTemplate.exchange(TOPIC_URL, HttpMethod.GET, new HttpEntity<>(createTestHeaders()), String[].class))
+ Mockito.when(restTemplate.exchange(getTestTopicUrl(), HttpMethod.GET, new HttpEntity<>(createTestHeaders()), String[].class))
.thenReturn(new ResponseEntity<>(response, HttpStatus.OK));
service = new VesRetrievalService(restTemplate, parser, persister, config, ueHolder);
@@ -80,7 +81,7 @@ class VesRetrievalServiceTest {
@Test
void whenGetIsCalled_thenExceptionIsThrown() {
- Mockito.when(restTemplate.exchange(TOPIC_URL, HttpMethod.GET, new HttpEntity<>(createTestHeaders()), String[].class))
+ Mockito.when(restTemplate.exchange(getTestTopicUrl(), HttpMethod.GET, new HttpEntity<>(createTestHeaders()), String[].class))
.thenThrow(new RestClientException("An test exception"));
service = new VesRetrievalService(restTemplate, parser, persister, config, ueHolder);
@@ -125,5 +126,9 @@ class VesRetrievalServiceTest {
headers.setBasicAuth(getTestProperties().getUsername(), getTestProperties().getPassword());
return headers;
}
+
+ private String getTestTopicUrl() {
+ return TOPIC_URLS.get(0);
+ }
}