summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java14
1 files changed, 10 insertions, 4 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
index 4dcad3e..2f2ab4d 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
@@ -61,12 +61,14 @@ import java.util.UUID;
*/
@Data
public class DataRouterSubscriber implements HttpHandler {
+ public static final String METADATA_HEADER = "X-DMAAP-DR-META";
+ public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
+
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterSubscriber.class));
private static final int NUMBER_OF_ATTEMPTS = 5;
private static final int DEFAULT_TIMEOUT = 2000;
private static final int MAX_JITTER = 50;
- private static final String METADATA_HEADER = "X-ATT-DR-META";
private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
@@ -95,10 +97,11 @@ public class DataRouterSubscriber implements HttpHandler {
*/
public void start(MapperConfig config) throws TooManyTriesException, InterruptedException {
try {
- logger.unwrap().info(ONAPLogConstants.Markers.ENTRY, "Starting subscription to DataRouter");
+ logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY);
subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config);
+ logger.unwrap().info("Successfully started DR Subscriber");
} finally {
- logger.unwrap().info(ONAPLogConstants.Markers.EXIT, "");
+ logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT);
}
}
@@ -128,6 +131,7 @@ public class DataRouterSubscriber implements HttpHandler {
subscriberObj.addProperty("lastMod", Instant.now().toString());
subscriberObj.addProperty("username", config.getBusControllerUserName());
subscriberObj.addProperty("userpwd", config.getBusControllerPassword());
+ subscriberObj.addProperty("privilegedSubscriber", true);
return subscriberObj;
}
@@ -183,9 +187,11 @@ public class DataRouterSubscriber implements HttpHandler {
Map<String,String> mdc = MDC.getCopyOfContextMap();
EventMetadata metadata = getMetadata(httpServerExchange);
+ String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst();
httpServerExchange.getRequestReceiver()
.receiveFullString((callbackExchange, body) ->
- httpServerExchange.dispatch(() -> eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc)))
+ httpServerExchange.dispatch(() ->
+ eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc, publishIdentity)))
);
} catch (NoMetadataException exception) {
logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);