summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java2
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java14
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java28
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java2
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java3
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java53
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java16
7 files changed, 111 insertions, 7 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index e083466..11767e6 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -74,7 +74,7 @@ public class App {
Undertow.builder()
.addHttpListener(8081, "0.0.0.0")
- .setHandler(Handlers.routing().add("put", "/delivery", dataRouterSubscriber)
+ .setHandler(Handlers.routing().add("put", "/delivery/{filename}", dataRouterSubscriber)
.add("get", "/healthcheck", healthCheckHandler))
.build().start();
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
index 4dcad3e..2f2ab4d 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
@@ -61,12 +61,14 @@ import java.util.UUID;
*/
@Data
public class DataRouterSubscriber implements HttpHandler {
+ public static final String METADATA_HEADER = "X-DMAAP-DR-META";
+ public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
+
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterSubscriber.class));
private static final int NUMBER_OF_ATTEMPTS = 5;
private static final int DEFAULT_TIMEOUT = 2000;
private static final int MAX_JITTER = 50;
- private static final String METADATA_HEADER = "X-ATT-DR-META";
private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
@@ -95,10 +97,11 @@ public class DataRouterSubscriber implements HttpHandler {
*/
public void start(MapperConfig config) throws TooManyTriesException, InterruptedException {
try {
- logger.unwrap().info(ONAPLogConstants.Markers.ENTRY, "Starting subscription to DataRouter");
+ logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY);
subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config);
+ logger.unwrap().info("Successfully started DR Subscriber");
} finally {
- logger.unwrap().info(ONAPLogConstants.Markers.EXIT, "");
+ logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT);
}
}
@@ -128,6 +131,7 @@ public class DataRouterSubscriber implements HttpHandler {
subscriberObj.addProperty("lastMod", Instant.now().toString());
subscriberObj.addProperty("username", config.getBusControllerUserName());
subscriberObj.addProperty("userpwd", config.getBusControllerPassword());
+ subscriberObj.addProperty("privilegedSubscriber", true);
return subscriberObj;
}
@@ -183,9 +187,11 @@ public class DataRouterSubscriber implements HttpHandler {
Map<String,String> mdc = MDC.getCopyOfContextMap();
EventMetadata metadata = getMetadata(httpServerExchange);
+ String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst();
httpServerExchange.getRequestReceiver()
.receiveFullString((callbackExchange, body) ->
- httpServerExchange.dispatch(() -> eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc)))
+ httpServerExchange.dispatch(() ->
+ eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc, publishIdentity)))
);
} catch (NoMetadataException exception) {
logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java
new file mode 100644
index 0000000..e8a2f11
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java
@@ -0,0 +1,28 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+public class ProcessEventException extends RuntimeException{
+ public ProcessEventException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
index c2cacaa..7a7cb1f 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
@@ -38,4 +38,6 @@ public class Event {
private EventMetadata metadata;
@NonNull
private Map<String, String> mdc;
+ @NonNull
+ private String publishIdentity;
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
index 40327db..0412ece 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
@@ -78,6 +78,9 @@ public class MapperConfig {
return new URL(this.getBusControllerSubscriptionEndpoint());
}
+ public String getSubscriberIdentity(){
+ return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
+ }
@Getter
@EqualsAndHashCode
private class StreamsSubscribes {
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
new file mode 100644
index 0000000..9525ec7
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
@@ -0,0 +1,53 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+
+public class DataRouterUtils {
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterUtils.class));
+
+ private DataRouterUtils(){
+ throw new IllegalStateException("Utility class;shouldn't be constructed");
+ }
+
+ /**
+ * Sends Delete to DR required as part of the new guaranteed delivery mechanism.
+ * @param config used to determine subscriber id and target endpoint
+ * @param event event to be processed
+ */
+ public static String processEvent(MapperConfig config, Event event){
+ logger.unwrap().info("Sending processed to DataRouter");
+ String baseDelete = config.getDmaapDRDeleteEndpoint();
+ String subscriberIdentity = config.getSubscriberIdentity();
+ String delete = String.format("https://%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity());
+ try {
+ return new RequestSender().send("DELETE", delete);
+ } catch (Exception exception) {
+ logger.unwrap().error("Process event failure", exception);
+ throw new ProcessEventException("Process event failure", exception);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
index 25519a0..3380aca 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
@@ -32,14 +32,13 @@ import org.onap.logging.ref.slf4j.ONAPLogAdapter;
import org.onap.logging.ref.slf4j.ONAPLogConstants;
import org.slf4j.LoggerFactory;
-import lombok.extern.slf4j.Slf4j;
-
public class RequestSender {
private static final int MAX_RETRIES = 5;
private static final int RETRY_INTERVAL = 1000;
private static final String SERVER_ERROR_MESSAGE = "Error on Server";
private static final int ERROR_START_RANGE = 300;
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));
+ public static final String DELETE = "DELETE";
/**
* Sends an Http GET request to a given endpoint.
@@ -50,6 +49,18 @@ public class RequestSender {
*/
public String send(final String urlString) throws Exception {
+ return send("GET", urlString);
+ }
+
+
+ /**
+ * Sends a request to a given endpoint.
+ * @param method of the outbound request
+ * @param urlString representing given endpoint
+ * @return http response body
+ * @throws Exception
+ */
+ public String send(String method, final String urlString) throws Exception {
final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);
final UUID requestID = UUID.randomUUID();
String result = "";
@@ -60,6 +71,7 @@ public class RequestSender {
connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());
connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());
connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
+ connection.setRequestMethod(method);
logger.unwrap()
.info("Sending:\n{}", connection.getRequestProperties());