diff options
author | JoeOLeary <joseph.o.leary@est.tech> | 2019-03-01 14:09:52 +0000 |
---|---|---|
committer | JoeOLeary <joseph.o.leary@est.tech> | 2019-03-01 14:09:52 +0000 |
commit | e5013912e5e1f62cd4ae9ee77fa765ed664c7d2f (patch) | |
tree | 8b99b18baf0895f7be690833129e2d07f1bbfc83 /src | |
parent | b1fee35cd69ff8019c9deb38b482f24aa20a2ddd (diff) |
Update DataRouter Subscriber
*Update metadata header key to match new datarouter specification
*Update subscriber to be a privileged subscriber
*Update subscriber to improve logging & remove sonar smells
*Update delivery end point to match datarouter specification
*Update event to include the publish id provided by datarouter
*Add datarouter event processed utility
Issue-ID: DCAEGEN2-1038
Change-Id: Iafce544f31f888de53547de8b280faebd8075d4c
Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
Diffstat (limited to 'src')
10 files changed, 252 insertions, 15 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index e083466..11767e6 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -74,7 +74,7 @@ public class App { Undertow.builder() .addHttpListener(8081, "0.0.0.0") - .setHandler(Handlers.routing().add("put", "/delivery", dataRouterSubscriber) + .setHandler(Handlers.routing().add("put", "/delivery/{filename}", dataRouterSubscriber) .add("get", "/healthcheck", healthCheckHandler)) .build().start(); } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java index 4dcad3e..2f2ab4d 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java @@ -61,12 +61,14 @@ import java.util.UUID; */ @Data public class DataRouterSubscriber implements HttpHandler { + public static final String METADATA_HEADER = "X-DMAAP-DR-META"; + public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID"; + private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterSubscriber.class)); private static final int NUMBER_OF_ATTEMPTS = 5; private static final int DEFAULT_TIMEOUT = 2000; private static final int MAX_JITTER = 50; - private static final String METADATA_HEADER = "X-ATT-DR-META"; private static final String BAD_METADATA_MESSAGE = "Malformed Metadata."; private static final String NO_METADATA_MESSAGE = "Missing Metadata."; @@ -95,10 +97,11 @@ public class DataRouterSubscriber implements HttpHandler { */ public void start(MapperConfig config) throws TooManyTriesException, InterruptedException { try { - logger.unwrap().info(ONAPLogConstants.Markers.ENTRY, "Starting subscription to DataRouter"); + logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY); subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config); + logger.unwrap().info("Successfully started DR Subscriber"); } finally { - logger.unwrap().info(ONAPLogConstants.Markers.EXIT, ""); + logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT); } } @@ -128,6 +131,7 @@ public class DataRouterSubscriber implements HttpHandler { subscriberObj.addProperty("lastMod", Instant.now().toString()); subscriberObj.addProperty("username", config.getBusControllerUserName()); subscriberObj.addProperty("userpwd", config.getBusControllerPassword()); + subscriberObj.addProperty("privilegedSubscriber", true); return subscriberObj; } @@ -183,9 +187,11 @@ public class DataRouterSubscriber implements HttpHandler { Map<String,String> mdc = MDC.getCopyOfContextMap(); EventMetadata metadata = getMetadata(httpServerExchange); + String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst(); httpServerExchange.getRequestReceiver() .receiveFullString((callbackExchange, body) -> - httpServerExchange.dispatch(() -> eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc))) + httpServerExchange.dispatch(() -> + eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc, publishIdentity))) ); } catch (NoMetadataException exception) { logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception); diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java new file mode 100644 index 0000000..e8a2f11 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java @@ -0,0 +1,28 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.exceptions; + +public class ProcessEventException extends RuntimeException{ + public ProcessEventException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java index c2cacaa..7a7cb1f 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java @@ -38,4 +38,6 @@ public class Event { private EventMetadata metadata; @NonNull private Map<String, String> mdc; + @NonNull + private String publishIdentity; } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java index 40327db..0412ece 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java @@ -78,6 +78,9 @@ public class MapperConfig { return new URL(this.getBusControllerSubscriptionEndpoint());
}
+ public String getSubscriberIdentity(){
+ return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
+ }
@Getter
@EqualsAndHashCode
private class StreamsSubscribes {
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java new file mode 100644 index 0000000..9525ec7 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java @@ -0,0 +1,53 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.utils; + +import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.onap.logging.ref.slf4j.ONAPLogAdapter; +import org.slf4j.LoggerFactory; + +public class DataRouterUtils { + private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterUtils.class)); + + private DataRouterUtils(){ + throw new IllegalStateException("Utility class;shouldn't be constructed"); + } + + /** + * Sends Delete to DR required as part of the new guaranteed delivery mechanism. + * @param config used to determine subscriber id and target endpoint + * @param event event to be processed + */ + public static String processEvent(MapperConfig config, Event event){ + logger.unwrap().info("Sending processed to DataRouter"); + String baseDelete = config.getDmaapDRDeleteEndpoint(); + String subscriberIdentity = config.getSubscriberIdentity(); + String delete = String.format("https://%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity()); + try { + return new RequestSender().send("DELETE", delete); + } catch (Exception exception) { + logger.unwrap().error("Process event failure", exception); + throw new ProcessEventException("Process event failure", exception); + } + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java index 25519a0..3380aca 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java @@ -32,14 +32,13 @@ import org.onap.logging.ref.slf4j.ONAPLogAdapter; import org.onap.logging.ref.slf4j.ONAPLogConstants;
import org.slf4j.LoggerFactory;
-import lombok.extern.slf4j.Slf4j;
-
public class RequestSender {
private static final int MAX_RETRIES = 5;
private static final int RETRY_INTERVAL = 1000;
private static final String SERVER_ERROR_MESSAGE = "Error on Server";
private static final int ERROR_START_RANGE = 300;
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));
+ public static final String DELETE = "DELETE";
/**
* Sends an Http GET request to a given endpoint.
@@ -50,6 +49,18 @@ public class RequestSender { */
public String send(final String urlString) throws Exception {
+ return send("GET", urlString);
+ }
+
+
+ /**
+ * Sends a request to a given endpoint.
+ * @param method of the outbound request
+ * @param urlString representing given endpoint
+ * @return http response body
+ * @throws Exception
+ */
+ public String send(String method, final String urlString) throws Exception {
final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);
final UUID requestID = UUID.randomUUID();
String result = "";
@@ -60,6 +71,7 @@ public class RequestSender { connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());
connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());
connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
+ connection.setRequestMethod(method);
logger.unwrap()
.info("Sending:\n{}", connection.getRequestProperties());
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java index ad73b63..fdc1bf6 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java @@ -20,6 +20,8 @@ package org.onap.dcaegen2.services.pmmapper.datarouter; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; @@ -29,19 +31,17 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; + import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import io.undertow.io.Receiver; import io.undertow.io.Sender; import io.undertow.server.HttpServerExchange; import io.undertow.util.HeaderMap; import io.undertow.util.StatusCodes; -import utils.LoggingUtils; import java.io.IOException; import java.net.HttpURLConnection; @@ -56,12 +56,13 @@ import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; -import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; import org.onap.dcaegen2.services.pmmapper.model.Event; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import utils.LoggingUtils; @RunWith(PowerMockRunner.class) @PrepareForTest(DataRouterSubscriber.class) @@ -218,8 +219,9 @@ public class DataRouterSubscriberTest { String testString = "MESSAGE BODY"; JsonObject metadata = new JsonParser().parse( new String(Files.readAllBytes(Paths.get("src/test/resources/valid_metadata.json")))).getAsJsonObject(); - when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt())) + when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.METADATA_HEADER).get(anyInt())) .thenReturn(metadata.toString()); + when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.PUB_ID_HEADER).getFirst()).thenReturn(""); doAnswer((Answer<Void>) invocationOnMock -> { Receiver.FullStringCallback callback = invocationOnMock.getArgument(0); callback.handle(httpServerExchange, testString); diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java new file mode 100644 index 0000000..73967c2 --- /dev/null +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java @@ -0,0 +1,116 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import org.onap.dcaegen2.services.pmmapper.model.EventMetadata; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; +import utils.EventUtils; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest(RequestSender.class) +public class DataRouterUtilsTest { + + @Test + public void processEventSuccessful() throws Exception { + String serviceResponse = "I'm a service response ;)"; + String publishIdentity = "12"; + PowerMockito.mockStatic(Thread.class); + MapperConfig mockMapperConfig = mock(MapperConfig.class); + URL mockURL = mock(URL.class); + HttpURLConnection mockConnection = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + when(mockConnection.getResponseCode()).thenReturn(200); + when(mockConnection.getInputStream()).thenReturn(new ByteArrayInputStream(serviceResponse.getBytes())); + + when(mockURL.openConnection()).thenReturn(mockConnection); + when(mockMapperConfig.getDmaapDRDeleteEndpoint()).thenReturn("dmaap-dr-node/delete/"); + when(mockMapperConfig.getSubscriberIdentity()).thenReturn("12"); + + PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL); + + Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity); + assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent)); + verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE); + } + + @Test + public void testNegativeResponse() throws Exception { + String serviceResponse = "I'm a negative service response ;)"; + String publishIdentity = "12"; + PowerMockito.mockStatic(Thread.class); + MapperConfig mockMapperConfig = mock(MapperConfig.class); + URL mockURL = mock(URL.class); + HttpURLConnection mockConnection = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + when(mockConnection.getResponseCode()).thenReturn(503); + when(mockConnection.getInputStream()) + .thenAnswer(invocationOnMock -> new ByteArrayInputStream(serviceResponse.getBytes())); + + when(mockURL.openConnection()).thenReturn(mockConnection); + when(mockMapperConfig.getDmaapDRDeleteEndpoint()).thenReturn("dmaap-dr-node/delete/"); + when(mockMapperConfig.getSubscriberIdentity()).thenReturn("12"); + + PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL); + Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity); + assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent)); + verify(mockConnection, times(5)).setRequestMethod(RequestSender.DELETE); + } + + @Test + public void testConstructionException() { + assertThrows(IllegalStateException.class, () -> Whitebox.invokeConstructor(DataRouterUtils.class)); + } + + @Test + public void testProcessEventFailure() throws Exception { + PowerMockito.mockStatic(Thread.class); + MapperConfig mockMapperConfig = mock(MapperConfig.class); + URL mockURL = mock(URL.class); + HttpURLConnection mockConnection = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + when(mockConnection.getResponseCode()).thenReturn(503); + + when(mockURL.openConnection()).thenReturn(mockConnection); + when(mockMapperConfig.getDmaapDRDeleteEndpoint()).thenReturn("dmaap-dr-node/delete/"); + when(mockMapperConfig.getSubscriberIdentity()).thenReturn("12"); + + PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL); + Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class)); + assertThrows(ProcessEventException.class, () -> DataRouterUtils.processEvent(mockMapperConfig, testEvent)); + } +} diff --git a/src/test/java/utils/EventUtils.java b/src/test/java/utils/EventUtils.java index 90317c2..a6b131c 100644 --- a/src/test/java/utils/EventUtils.java +++ b/src/test/java/utils/EventUtils.java @@ -75,11 +75,26 @@ public class EventUtils { } /** + * Makes an event with a mock http server exchange, empty mdc and publish identity * @param body body for the event. * @param eventMetadata metadata for the event. * @return event with mock HttpServerExchange */ public static Event makeMockEvent(String body, EventMetadata eventMetadata) { - return new Event(mock(HttpServerExchange.class, RETURNS_DEEP_STUBS), body, eventMetadata, new HashMap<>()); + return new Event(mock(HttpServerExchange.class, RETURNS_DEEP_STUBS), body, eventMetadata, new HashMap<>(), ""); } + + + /** + * Makes an event with a mock http server exchange and empty mdc + * @param body body for the event. + * @param eventMetadata metadata for the event. + * @return event with mock HttpServerExchange + */ + public static Event makeMockEvent(String body, EventMetadata eventMetadata, String publishIdentity) { + HttpServerExchange mockHttpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS); + return new Event(mockHttpServerExchange, body, eventMetadata, new HashMap<>(), publishIdentity); + } + + } |