summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java3
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java2
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java28
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java69
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java8
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java68
6 files changed, 156 insertions, 22 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 9abe086..03d42d5 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -38,6 +38,7 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
+import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
@@ -73,6 +74,7 @@ public class App {
Mapper mapper = new Mapper(mappingTemplate);
MeasSplitter splitter = new MeasSplitter(measConverter);
XMLValidator validator = new XMLValidator(xmlSchema);
+ VESPublisher vesPublisher = new VESPublisher(mapperConfig);
flux.onBackpressureDrop(App::handleBackPressure)
.doOnNext(App::receiveRequest)
@@ -86,6 +88,7 @@ public class App {
.concatMap(event -> App.split(splitter,event, mapperConfig))
.filter(events -> App.filter(filterHandler, events, mapperConfig))
.concatMap(events -> App.map(mapper, events, mapperConfig))
+ .concatMap(vesPublisher::publish)
.subscribe(events -> logger.unwrap().info("Event Processed"));
DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
index f37bcd3..19a4750 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
@@ -135,7 +135,7 @@ public class DataRouterSubscriber implements HttpHandler, Configurable {
private JsonObject getBusControllerSubscribeBody(MapperConfig config) {
JsonObject subscriberObj = new JsonObject();
- subscriberObj.addProperty("dcaeLocationName", config.getDcaeLocation());
+ subscriberObj.addProperty("dcaeLocationName", config.getSubscriberDcaeLocation());
subscriberObj.addProperty("deliveryURL", config.getBusControllerDeliveryUrl());
subscriberObj.addProperty("feedId", config.getDmaapDRFeedId());
subscriberObj.addProperty("lastMod", Instant.now().toString());
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java
new file mode 100644
index 0000000..6b5c157
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java
@@ -0,0 +1,28 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+public class MRPublisherException extends RuntimeException{
+ public MRPublisherException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
new file mode 100644
index 0000000..77b0545
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.messagerouter;
+
+import java.util.List;
+
+import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+
+public class VESPublisher {
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class));
+ private RequestSender sender;
+ private MapperConfig config;
+
+ public VESPublisher(MapperConfig config) {
+ this(config, new RequestSender());
+ }
+
+ public VESPublisher(MapperConfig config, RequestSender sender) {
+ this.sender = sender;
+ this.config = config;
+ }
+
+ public Flux<Event> publish(List<Event> events) {
+ logger.unwrap().info("Publishing VES events to messagerouter.");
+ Event event = events.get(0);
+ try {
+ events.forEach(e -> this.publish(e.getVes()));
+ logger.unwrap().info("Successfully published VES events to messagerouter.");
+ } catch(MRPublisherException e) {
+ logger.unwrap().error("Failed to publish VES event(s) to messagerouter. {}", e.getMessage());
+ return Flux.empty();
+ }
+ return Flux.just(event);
+ }
+
+ private void publish(String ves) {
+ try {
+ String topicUrl = config.getPublisherTopicUrl();
+ sender.send("POST", topicUrl, ves);
+ } catch (Exception e) {
+ throw new MRPublisherException(e.getMessage(), e);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
index d28d850..ffb09ba 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
@@ -82,6 +82,14 @@ public class MapperConfig {
return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
}
+ public String getSubscriberDcaeLocation() {
+ return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation();
+ }
+
+ public String getPublisherTopicUrl() {
+ return this.getStreamsPublishes().getDmaapPublisher().getDmaapInfo().getTopicUrl();
+ }
+
public boolean dmaapInfoEquals(MapperConfig mapperConfig){
return this
.getStreamsSubscribes()
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
index 3380aca..658f820 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
@@ -20,10 +20,13 @@
package org.onap.dcaegen2.services.pmmapper.utils;
import java.io.BufferedReader;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -39,41 +42,47 @@ public class RequestSender {
private static final int ERROR_START_RANGE = 300;
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));
public static final String DELETE = "DELETE";
+ public static final String DEFAULT_CONTENT_TYPE = "text/plain";
+ public static final int DEFAULT_READ_TIMEOUT = 5000;
/**
- * Sends an Http GET request to a given endpoint.
- *
- * @return http response body
- * @throws Exception
- * @throws InterruptedException
+ * Works just like {@link RequestSender#send(method,urlString)}, except {@code method }
+ * is set to {@code GET} by default.
+ * @see RequestSender#send(String,String,String)
*/
-
public String send(final String urlString) throws Exception {
return send("GET", urlString);
}
+ /**
+ * Works just like {@link RequestSender#send(method,urlString,body)}, except {@code body }
+ * is set to empty String by default.
+ * @see RequestSender#send(String,String,String)
+ */
+ public String send(String method, final String urlString) throws Exception {
+ return send(method,urlString,"");
+ }
/**
- * Sends a request to a given endpoint.
+ * Sends an http request to a given endpoint.
* @param method of the outbound request
* @param urlString representing given endpoint
+ * @param body of the request as json
* @return http response body
* @throws Exception
*/
- public String send(String method, final String urlString) throws Exception {
+ public String send(String method, final String urlString, final String body) throws Exception {
final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);
final UUID requestID = UUID.randomUUID();
String result = "";
for (int i = 1; i <= MAX_RETRIES; i++) {
- URL url = new URL(urlString);
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());
- connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());
- connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
- connection.setRequestMethod(method);
- logger.unwrap()
- .info("Sending:\n{}", connection.getRequestProperties());
+ final URL url = new URL(urlString);
+ final HttpURLConnection connection = getHttpURLConnection(method, url, invocationID, requestID);
+ if(!body.isEmpty()) {
+ setMessageBody(connection, body);
+ }
+ logger.unwrap().info("Sending {} request to {}.", method, urlString);
try (InputStream is = connection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
@@ -81,15 +90,12 @@ public class RequestSender {
.collect(Collectors.joining("\n"));
int responseCode = connection.getResponseCode();
if (!(isWithinErrorRange(responseCode))) {
- logger.unwrap()
- .info("Received:\n{}", result);
+ logger.unwrap().info("Server Response Received:\n{}", result);
break;
}
-
} catch (Exception e) {
if (retryLimitReached(i)) {
- logger.unwrap()
- .error("Execution error: "+connection.getResponseMessage(), e);
+ logger.unwrap().error("Execution error: "+connection.getResponseMessage(), e);
throw new Exception(SERVER_ERROR_MESSAGE + ": " + connection.getResponseMessage(), e);
}
}
@@ -99,6 +105,26 @@ public class RequestSender {
return result;
}
+ private HttpURLConnection getHttpURLConnection(String method, URL url, UUID invocationID, UUID requestID) throws Exception {
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setReadTimeout(DEFAULT_READ_TIMEOUT);
+ connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());
+ connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());
+ connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
+ connection.setRequestMethod(method);
+
+ return connection;
+ }
+
+ private void setMessageBody(HttpURLConnection connection, String body) throws IOException {
+ connection.setRequestProperty("Content-Type",DEFAULT_CONTENT_TYPE);
+ connection.setDoOutput(true);
+ OutputStream outputStream = connection.getOutputStream();
+ outputStream.write(body.getBytes(StandardCharsets.UTF_8));
+ outputStream.flush();
+ outputStream.close();
+ }
+
private boolean retryLimitReached(final int retryCount) {
return retryCount >= MAX_RETRIES;
}