diff options
author | JoeOLeary <joseph.o.leary@est.tech> | 2019-03-01 14:09:52 +0000 |
---|---|---|
committer | JoeOLeary <joseph.o.leary@est.tech> | 2019-03-01 14:09:52 +0000 |
commit | e5013912e5e1f62cd4ae9ee77fa765ed664c7d2f (patch) | |
tree | 8b99b18baf0895f7be690833129e2d07f1bbfc83 /src/main | |
parent | b1fee35cd69ff8019c9deb38b482f24aa20a2ddd (diff) |
Update DataRouter Subscriber
*Update metadata header key to match new datarouter specification
*Update subscriber to be a privileged subscriber
*Update subscriber to improve logging & remove sonar smells
*Update delivery end point to match datarouter specification
*Update event to include the publish id provided by datarouter
*Add datarouter event processed utility
Issue-ID: DCAEGEN2-1038
Change-Id: Iafce544f31f888de53547de8b280faebd8075d4c
Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
Diffstat (limited to 'src/main')
7 files changed, 111 insertions, 7 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index e083466..11767e6 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -74,7 +74,7 @@ public class App { Undertow.builder() .addHttpListener(8081, "0.0.0.0") - .setHandler(Handlers.routing().add("put", "/delivery", dataRouterSubscriber) + .setHandler(Handlers.routing().add("put", "/delivery/{filename}", dataRouterSubscriber) .add("get", "/healthcheck", healthCheckHandler)) .build().start(); } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java index 4dcad3e..2f2ab4d 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java @@ -61,12 +61,14 @@ import java.util.UUID; */ @Data public class DataRouterSubscriber implements HttpHandler { + public static final String METADATA_HEADER = "X-DMAAP-DR-META"; + public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID"; + private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterSubscriber.class)); private static final int NUMBER_OF_ATTEMPTS = 5; private static final int DEFAULT_TIMEOUT = 2000; private static final int MAX_JITTER = 50; - private static final String METADATA_HEADER = "X-ATT-DR-META"; private static final String BAD_METADATA_MESSAGE = "Malformed Metadata."; private static final String NO_METADATA_MESSAGE = "Missing Metadata."; @@ -95,10 +97,11 @@ public class DataRouterSubscriber implements HttpHandler { */ public void start(MapperConfig config) throws TooManyTriesException, InterruptedException { try { - logger.unwrap().info(ONAPLogConstants.Markers.ENTRY, "Starting subscription to DataRouter"); + logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY); subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config); + logger.unwrap().info("Successfully started DR Subscriber"); } finally { - logger.unwrap().info(ONAPLogConstants.Markers.EXIT, ""); + logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT); } } @@ -128,6 +131,7 @@ public class DataRouterSubscriber implements HttpHandler { subscriberObj.addProperty("lastMod", Instant.now().toString()); subscriberObj.addProperty("username", config.getBusControllerUserName()); subscriberObj.addProperty("userpwd", config.getBusControllerPassword()); + subscriberObj.addProperty("privilegedSubscriber", true); return subscriberObj; } @@ -183,9 +187,11 @@ public class DataRouterSubscriber implements HttpHandler { Map<String,String> mdc = MDC.getCopyOfContextMap(); EventMetadata metadata = getMetadata(httpServerExchange); + String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst(); httpServerExchange.getRequestReceiver() .receiveFullString((callbackExchange, body) -> - httpServerExchange.dispatch(() -> eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc))) + httpServerExchange.dispatch(() -> + eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc, publishIdentity))) ); } catch (NoMetadataException exception) { logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception); diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java new file mode 100644 index 0000000..e8a2f11 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java @@ -0,0 +1,28 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.exceptions; + +public class ProcessEventException extends RuntimeException{ + public ProcessEventException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java index c2cacaa..7a7cb1f 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java @@ -38,4 +38,6 @@ public class Event { private EventMetadata metadata; @NonNull private Map<String, String> mdc; + @NonNull + private String publishIdentity; } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java index 40327db..0412ece 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java @@ -78,6 +78,9 @@ public class MapperConfig { return new URL(this.getBusControllerSubscriptionEndpoint());
}
+ public String getSubscriberIdentity(){
+ return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
+ }
@Getter
@EqualsAndHashCode
private class StreamsSubscribes {
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java new file mode 100644 index 0000000..9525ec7 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java @@ -0,0 +1,53 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.utils; + +import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.onap.logging.ref.slf4j.ONAPLogAdapter; +import org.slf4j.LoggerFactory; + +public class DataRouterUtils { + private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterUtils.class)); + + private DataRouterUtils(){ + throw new IllegalStateException("Utility class;shouldn't be constructed"); + } + + /** + * Sends Delete to DR required as part of the new guaranteed delivery mechanism. + * @param config used to determine subscriber id and target endpoint + * @param event event to be processed + */ + public static String processEvent(MapperConfig config, Event event){ + logger.unwrap().info("Sending processed to DataRouter"); + String baseDelete = config.getDmaapDRDeleteEndpoint(); + String subscriberIdentity = config.getSubscriberIdentity(); + String delete = String.format("https://%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity()); + try { + return new RequestSender().send("DELETE", delete); + } catch (Exception exception) { + logger.unwrap().error("Process event failure", exception); + throw new ProcessEventException("Process event failure", exception); + } + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java index 25519a0..3380aca 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java @@ -32,14 +32,13 @@ import org.onap.logging.ref.slf4j.ONAPLogAdapter; import org.onap.logging.ref.slf4j.ONAPLogConstants;
import org.slf4j.LoggerFactory;
-import lombok.extern.slf4j.Slf4j;
-
public class RequestSender {
private static final int MAX_RETRIES = 5;
private static final int RETRY_INTERVAL = 1000;
private static final String SERVER_ERROR_MESSAGE = "Error on Server";
private static final int ERROR_START_RANGE = 300;
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));
+ public static final String DELETE = "DELETE";
/**
* Sends an Http GET request to a given endpoint.
@@ -50,6 +49,18 @@ public class RequestSender { */
public String send(final String urlString) throws Exception {
+ return send("GET", urlString);
+ }
+
+
+ /**
+ * Sends a request to a given endpoint.
+ * @param method of the outbound request
+ * @param urlString representing given endpoint
+ * @return http response body
+ * @throws Exception
+ */
+ public String send(String method, final String urlString) throws Exception {
final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);
final UUID requestID = UUID.randomUUID();
String result = "";
@@ -60,6 +71,7 @@ public class RequestSender { connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());
connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());
connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
+ connection.setRequestMethod(method);
logger.unwrap()
.info("Sending:\n{}", connection.getRequestProperties());
|