From ab26367e1e54674d6eb597b005b15304b6502e88 Mon Sep 17 00:00:00 2001 From: emartin Date: Fri, 22 Mar 2019 16:45:31 +0000 Subject: Publish VES event to MessageRouter via http Change-Id: Ic5ed1fad1182e7343f213488c4015d2683ab8ddc Issue-ID: DCAEGEN2-1273 Signed-off-by: emartin --- .../org/onap/dcaegen2/services/pmmapper/App.java | 3 + .../pmmapper/datarouter/DataRouterSubscriber.java | 2 +- .../pmmapper/exceptions/MRPublisherException.java | 28 +++++++ .../pmmapper/messagerouter/VESPublisher.java | 69 +++++++++++++++++ .../services/pmmapper/model/MapperConfig.java | 8 ++ .../services/pmmapper/utils/RequestSender.java | 68 ++++++++++++----- .../pmmapper/messagerouter/VESPublisherTest.java | 89 ++++++++++++++++++++++ 7 files changed, 245 insertions(+), 22 deletions(-) create mode 100644 src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java create mode 100644 src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java create mode 100644 src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java (limited to 'src') diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 9abe086..03d42d5 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -38,6 +38,7 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter; import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler; import org.onap.dcaegen2.services.pmmapper.mapping.Mapper; +import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher; import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler; @@ -73,6 +74,7 @@ public class App { Mapper mapper = new Mapper(mappingTemplate); MeasSplitter splitter = new MeasSplitter(measConverter); XMLValidator validator = new XMLValidator(xmlSchema); + VESPublisher vesPublisher = new VESPublisher(mapperConfig); flux.onBackpressureDrop(App::handleBackPressure) .doOnNext(App::receiveRequest) @@ -86,6 +88,7 @@ public class App { .concatMap(event -> App.split(splitter,event, mapperConfig)) .filter(events -> App.filter(filterHandler, events, mapperConfig)) .concatMap(events -> App.map(mapper, events, mapperConfig)) + .concatMap(vesPublisher::publish) .subscribe(events -> logger.unwrap().info("Event Processed")); DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig); diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java index f37bcd3..19a4750 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java @@ -135,7 +135,7 @@ public class DataRouterSubscriber implements HttpHandler, Configurable { private JsonObject getBusControllerSubscribeBody(MapperConfig config) { JsonObject subscriberObj = new JsonObject(); - subscriberObj.addProperty("dcaeLocationName", config.getDcaeLocation()); + subscriberObj.addProperty("dcaeLocationName", config.getSubscriberDcaeLocation()); subscriberObj.addProperty("deliveryURL", config.getBusControllerDeliveryUrl()); subscriberObj.addProperty("feedId", config.getDmaapDRFeedId()); subscriberObj.addProperty("lastMod", Instant.now().toString()); diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java new file mode 100644 index 0000000..6b5c157 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java @@ -0,0 +1,28 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.exceptions; + +public class MRPublisherException extends RuntimeException{ + public MRPublisherException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java new file mode 100644 index 0000000..77b0545 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.messagerouter; + +import java.util.List; + +import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.onap.dcaegen2.services.pmmapper.utils.RequestSender; +import org.onap.logging.ref.slf4j.ONAPLogAdapter; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; + +public class VESPublisher { + private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class)); + private RequestSender sender; + private MapperConfig config; + + public VESPublisher(MapperConfig config) { + this(config, new RequestSender()); + } + + public VESPublisher(MapperConfig config, RequestSender sender) { + this.sender = sender; + this.config = config; + } + + public Flux publish(List events) { + logger.unwrap().info("Publishing VES events to messagerouter."); + Event event = events.get(0); + try { + events.forEach(e -> this.publish(e.getVes())); + logger.unwrap().info("Successfully published VES events to messagerouter."); + } catch(MRPublisherException e) { + logger.unwrap().error("Failed to publish VES event(s) to messagerouter. {}", e.getMessage()); + return Flux.empty(); + } + return Flux.just(event); + } + + private void publish(String ves) { + try { + String topicUrl = config.getPublisherTopicUrl(); + sender.send("POST", topicUrl, ves); + } catch (Exception e) { + throw new MRPublisherException(e.getMessage(), e); + } + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java index d28d850..ffb09ba 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java @@ -82,6 +82,14 @@ public class MapperConfig { return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId(); } + public String getSubscriberDcaeLocation() { + return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation(); + } + + public String getPublisherTopicUrl() { + return this.getStreamsPublishes().getDmaapPublisher().getDmaapInfo().getTopicUrl(); + } + public boolean dmaapInfoEquals(MapperConfig mapperConfig){ return this .getStreamsSubscribes() diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java index 3380aca..658f820 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java @@ -20,10 +20,13 @@ package org.onap.dcaegen2.services.pmmapper.utils; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.stream.Collectors; @@ -39,41 +42,47 @@ public class RequestSender { private static final int ERROR_START_RANGE = 300; private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class)); public static final String DELETE = "DELETE"; + public static final String DEFAULT_CONTENT_TYPE = "text/plain"; + public static final int DEFAULT_READ_TIMEOUT = 5000; /** - * Sends an Http GET request to a given endpoint. - * - * @return http response body - * @throws Exception - * @throws InterruptedException + * Works just like {@link RequestSender#send(method,urlString)}, except {@code method } + * is set to {@code GET} by default. + * @see RequestSender#send(String,String,String) */ - public String send(final String urlString) throws Exception { return send("GET", urlString); } + /** + * Works just like {@link RequestSender#send(method,urlString,body)}, except {@code body } + * is set to empty String by default. + * @see RequestSender#send(String,String,String) + */ + public String send(String method, final String urlString) throws Exception { + return send(method,urlString,""); + } /** - * Sends a request to a given endpoint. + * Sends an http request to a given endpoint. * @param method of the outbound request * @param urlString representing given endpoint + * @param body of the request as json * @return http response body * @throws Exception */ - public String send(String method, final String urlString) throws Exception { + public String send(String method, final String urlString, final String body) throws Exception { final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS); final UUID requestID = UUID.randomUUID(); String result = ""; for (int i = 1; i <= MAX_RETRIES; i++) { - URL url = new URL(urlString); - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString()); - connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString()); - connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME); - connection.setRequestMethod(method); - logger.unwrap() - .info("Sending:\n{}", connection.getRequestProperties()); + final URL url = new URL(urlString); + final HttpURLConnection connection = getHttpURLConnection(method, url, invocationID, requestID); + if(!body.isEmpty()) { + setMessageBody(connection, body); + } + logger.unwrap().info("Sending {} request to {}.", method, urlString); try (InputStream is = connection.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(is))) { @@ -81,15 +90,12 @@ public class RequestSender { .collect(Collectors.joining("\n")); int responseCode = connection.getResponseCode(); if (!(isWithinErrorRange(responseCode))) { - logger.unwrap() - .info("Received:\n{}", result); + logger.unwrap().info("Server Response Received:\n{}", result); break; } - } catch (Exception e) { if (retryLimitReached(i)) { - logger.unwrap() - .error("Execution error: "+connection.getResponseMessage(), e); + logger.unwrap().error("Execution error: "+connection.getResponseMessage(), e); throw new Exception(SERVER_ERROR_MESSAGE + ": " + connection.getResponseMessage(), e); } } @@ -99,6 +105,26 @@ public class RequestSender { return result; } + private HttpURLConnection getHttpURLConnection(String method, URL url, UUID invocationID, UUID requestID) throws Exception { + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setReadTimeout(DEFAULT_READ_TIMEOUT); + connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString()); + connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString()); + connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME); + connection.setRequestMethod(method); + + return connection; + } + + private void setMessageBody(HttpURLConnection connection, String body) throws IOException { + connection.setRequestProperty("Content-Type",DEFAULT_CONTENT_TYPE); + connection.setDoOutput(true); + OutputStream outputStream = connection.getOutputStream(); + outputStream.write(body.getBytes(StandardCharsets.UTF_8)); + outputStream.flush(); + outputStream.close(); + } + private boolean retryLimitReached(final int retryCount) { return retryCount >= MAX_RETRIES; } diff --git a/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java new file mode 100644 index 0000000..69d34f8 --- /dev/null +++ b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.pmmapper.messagerouter; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import reactor.test.StepVerifier; +import java.util.Arrays; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException; +import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher; +import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.onap.dcaegen2.services.pmmapper.utils.RequestSender; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import reactor.core.publisher.Flux; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentConfig.class) +public class VESPublisherTest { + + private static String topicURL = "http://mr/topic"; + private static RequestSender sender; + private static MapperConfig config; + private VESPublisher sut; + private String ves = "{}"; + + @Before + public void before() throws Exception { + config = mock(MapperConfig.class); + sender = mock(RequestSender.class); + sut = new VESPublisher(config, sender); + when(config.getPublisherTopicUrl()).thenReturn(topicURL); + } + + @Test + public void publish_multiple_success() throws Exception { + Event event = mock(Event.class); + List events = Arrays.asList(event,event,event); + when(event.getVes()).thenReturn(ves); + + Flux flux = sut.publish(events); + + verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString()); + StepVerifier.create(flux) + .expectNextMatches(event::equals) + .expectComplete() + .verify(); + } + + @Test + public void publish_multiple_fail() throws Exception { + Event event = mock(Event.class); + List events = Arrays.asList(event,event,event); + when(event.getVes()).thenReturn(ves); + when(sender.send("POST",topicURL,ves)).thenThrow(Exception.class); + + Flux flux = sut.publish(events); + + StepVerifier.create(flux) + .verifyComplete(); + } +} -- cgit