summaryrefslogtreecommitdiffstats
path: root/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
diff options
context:
space:
mode:
Diffstat (limited to 'datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java')
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java20
1 files changed, 13 insertions, 7 deletions
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
index 379e32d..a3825f6 100644
--- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
@@ -21,6 +21,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurementValues;
import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
import org.onap.rapp.datacollector.entity.ves.Event;
@@ -61,20 +62,25 @@ public class VesRetrievalService implements DmaapRestReader {
@Override
public Collection<String> retrieveEvents() {
- logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrl());
+ logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrls());
+ return config.getMeasurementsTopicUrls().stream().flatMap(this::retrieveEventsFromTopic)
+ .collect(Collectors.toList());
+ }
+
+ private Stream<String> retrieveEventsFromTopic(String topic) {
try {
- ResponseEntity<String[]> responseEntity =
- restTemplate.exchange(config.getMeasurementsTopicUrl(), HttpMethod.GET,
- new HttpEntity<String>(createHeaders(config.getDmaapProperties().getUsername(), config.getDmaapProperties().getPassword())),
- String[].class);
+ ResponseEntity<String[]> responseEntity = restTemplate.exchange(topic, HttpMethod.GET,
+ new HttpEntity<String>(createHeaders(config.getDmaapProperties().getUsername(),
+ config.getDmaapProperties().getPassword())), String[].class);
if (responseEntity.hasBody()) {
String[] events = responseEntity.getBody();
- return Arrays.stream(events).collect(Collectors.toList());
+ return Arrays.stream(events);
}
} catch (RestClientException ex) {
logger.error("Failed to reach to dmaap", ex);
}
- return Collections.emptyList();
+
+ return Arrays.stream(new String[0]);
}
private HttpHeaders createHeaders(String username, String password) {