summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java19
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java31
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java90
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java102
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java30
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java9
-rw-r--r--src/main/resources/Dockerfile3
-rw-r--r--src/main/resources/reconfigure.sh6
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java153
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java168
-rw-r--r--src/test/resources/datarouter_subscriber_test/valid_bc_response.json15
11 files changed, 586 insertions, 40 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 09d8975..6ebc61c 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -26,6 +26,8 @@ import io.undertow.util.StatusCodes;
import lombok.NonNull;
import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
+import org.onap.dcaegen2.services.pmmapper.config.Configurable;
+import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration;
import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
@@ -47,6 +49,7 @@ import reactor.core.scheduler.Schedulers;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
public class App {
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class));
@@ -57,9 +60,7 @@ public class App {
public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException {
Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink);
HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
-
MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
-
MetadataFilter metadataFilter = new MetadataFilter(mapperConfig);
Mapper mapper = new Mapper(mappingTemplate);
XMLValidator validator = new XMLValidator(xmlSchema);
@@ -73,14 +74,18 @@ public class App {
.filter(validator::validate)
.map(mapper::map)
.subscribe(event -> logger.unwrap().info("Event Processed"));
-
- DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next);
- dataRouterSubscriber.start(mapperConfig);
+ DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
+ dataRouterSubscriber.start();
+ ArrayList<Configurable> configurables = new ArrayList<>();
+ configurables.add(dataRouterSubscriber);
+ DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(configurables, mapperConfig);
Undertow.builder()
.addHttpListener(8081, "0.0.0.0")
- .setHandler(Handlers.routing().add("put", "/delivery/{filename}", dataRouterSubscriber)
- .add("get", "/healthcheck", healthCheckHandler))
+ .setHandler(Handlers.routing()
+ .add("put", "/delivery/{filename}", dataRouterSubscriber)
+ .add("get", "/healthcheck", healthCheckHandler)
+ .add("get", "/reconfigure", dynamicConfiguration))
.build().start();
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java
new file mode 100644
index 0000000..ac2fe57
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java
@@ -0,0 +1,31 @@
+/*
+ * -
+ * * ============LICENSE_START=======================================================
+ * * Copyright (C) 2019 Nordix Foundation.
+ * * ================================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * *
+ * * SPDX-License-Identifier: Apache-2.0
+ * * ============LICENSE_END=========================================================
+ *
+ */
+
+package org.onap.dcaegen2.services.pmmapper.config;
+
+import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+
+public interface Configurable {
+
+ void reconfigure(MapperConfig mapperConfig) throws ReconfigurationException;
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java
new file mode 100644
index 0000000..37fa8b5
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java
@@ -0,0 +1,90 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.config;
+
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+import lombok.Data;
+import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+@Data
+public class DynamicConfiguration implements HttpHandler{
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DynamicConfiguration.class));
+ private List<Configurable> configurables;
+ private MapperConfig originalConfig;
+ private ConfigHandler configHandler;
+
+ /**
+ * Creates a Dynamic Configuration object with a list of configurable objects.
+ * @param configurables list of objects to reconfigure
+ * @param originalConfig original config to compare against.
+ */
+ public DynamicConfiguration(List<Configurable> configurables, MapperConfig originalConfig){
+ this.configurables = configurables;
+ this.originalConfig = originalConfig;
+ this.configHandler = new ConfigHandler();
+ }
+
+ private void applyConfiguration(MapperConfig updatedConfig) throws ReconfigurationException {
+ for (Configurable configurable : configurables) {
+ logger.unwrap().debug("Reconfiguring: {}", configurable);
+ configurable.reconfigure(updatedConfig);
+ }
+ }
+
+ /**
+ * Receives requests to pull the latest configuration from CBS.
+ * @param httpServerExchange inbound http server exchange.
+ * @throws Exception
+ */
+ @Override
+ public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
+ try {
+ logger.entering(new HttpServerExchangeAdapter(httpServerExchange));
+ MapperConfig config = configHandler.getMapperConfig();
+ int responseCode = StatusCodes.OK;
+ String responseMessage = StatusCodes.OK_STRING;
+
+ if (!this.originalConfig.equals(config)) {
+ logger.unwrap().info("Configuration update detected.");
+ logger.unwrap().info("Reconfiguring configurables");
+ try {
+ applyConfiguration(config);
+ this.originalConfig = config;
+ } catch (ReconfigurationException e) {
+ responseCode = StatusCodes.INTERNAL_SERVER_ERROR;
+ responseMessage = StatusCodes.INTERNAL_SERVER_ERROR_STRING;
+ applyConfiguration(this.originalConfig);
+ }
+ }
+ httpServerExchange.setStatusCode(responseCode).getResponseSender().send(responseMessage);
+ } finally {
+ logger.exiting();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
index 2f2ab4d..f37bcd3 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
@@ -24,11 +24,15 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
import io.undertow.util.HeaderValues;
import lombok.Data;
import lombok.NonNull;
+import org.onap.dcaegen2.services.pmmapper.config.Configurable;
import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException;
+import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
@@ -44,23 +48,28 @@ import org.onap.logging.ref.slf4j.ONAPLogConstants;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
+import java.util.stream.Collectors;
/**
* Subscriber for events sent from data router
* Provides an undertow HttpHandler to be used as an endpoint for data router to send events to.
*/
@Data
-public class DataRouterSubscriber implements HttpHandler {
+public class DataRouterSubscriber implements HttpHandler, Configurable {
public static final String METADATA_HEADER = "X-DMAAP-DR-META";
public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
@@ -75,40 +84,41 @@ public class DataRouterSubscriber implements HttpHandler {
private boolean limited = false;
private Random jitterGenerator;
private Gson metadataBuilder;
+ private MapperConfig config;
+ private String subscriberId;
@NonNull
private EventReceiver eventReceiver;
/**
* @param eventReceiver receiver for any inbound events.
*/
- public DataRouterSubscriber(EventReceiver eventReceiver) {
+ public DataRouterSubscriber(EventReceiver eventReceiver, MapperConfig config) {
this.eventReceiver = eventReceiver;
this.jitterGenerator = new Random();
this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>())
.create();
+ this.config = config;
+ this.subscriberId="";
}
/**
* Starts data flow by subscribing to data router through bus controller.
*
- * @param config configuration object containing bus controller endpoint for subscription and
- * all non constant configuration for subscription through this endpoint.
* @throws TooManyTriesException in the event that timeout has occurred several times.
*/
- public void start(MapperConfig config) throws TooManyTriesException, InterruptedException {
+ public void start() throws TooManyTriesException, InterruptedException {
try {
logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY);
- subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config);
+ subscribe();
logger.unwrap().info("Successfully started DR Subscriber");
} finally {
logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT);
}
}
- private HttpURLConnection getBusControllerConnection(MapperConfig config, int timeout) throws IOException {
- HttpURLConnection connection = (HttpURLConnection) config.getBusControllerSubscriptionUrl()
- .openConnection();
- connection.setRequestMethod("POST");
+ private HttpURLConnection getBusControllerConnection(String method, URL resource, int timeout) throws IOException {
+ HttpURLConnection connection = (HttpURLConnection) resource.openConnection();
+ connection.setRequestMethod(method);
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
connection.setRequestProperty("Content-Type", "application/json");
@@ -135,26 +145,66 @@ public class DataRouterSubscriber implements HttpHandler {
return subscriberObj;
}
- private void subscribe(int attempts, int timeout, MapperConfig config) throws TooManyTriesException, InterruptedException {
+ private void processResponse(HttpURLConnection connection) throws IOException {
+ try (BufferedReader responseBody = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
+ String body = responseBody.lines().collect(Collectors.joining(""));
+ updateSubscriberId(body);
+ } catch (IOException | JsonSyntaxException | IllegalStateException e) {
+ throw new IOException("Failed to process response", e);
+ }
+ }
+
+ private void updateSubscriberId(String responseBody) {
+ JsonParser parser = new JsonParser();
+ JsonObject responseObject = parser.parse(responseBody).getAsJsonObject();
+ this.subscriberId = responseObject.get("subId").getAsString();
+ }
+
+ private void subscribe() throws TooManyTriesException, InterruptedException {
+ try {
+ URL subscribeResource = this.config.getBusControllerSubscriptionUrl();
+ JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config);
+ request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "POST", subscribeResource, subscribeBody);
+ } catch (MalformedURLException e) {
+ throw new IllegalStateException("Subscription URL is malformed", e);
+ }
+
+ }
+ private void updateSubscriber() throws TooManyTriesException, InterruptedException {
+ try {
+ URL subscribeResource = this.config.getBusControllerSubscriptionUrl();
+ URL updateResource = new URL(String.format("%s/%s", subscribeResource, subscriberId));
+ JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config);
+ request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "PUT", updateResource, subscribeBody);
+ } catch (MalformedURLException e) {
+ throw new IllegalStateException("Subscription URL is malformed", e);
+ }
+ }
+
+ private void request(int attempts, int timeout, String method, URL resource, JsonObject subscribeBody) throws TooManyTriesException, InterruptedException {
int subResponse = 504;
String subMessage = "";
+ boolean processFailure = false;
try {
- HttpURLConnection connection = getBusControllerConnection(config, timeout);
-
+ HttpURLConnection connection = getBusControllerConnection(method, resource, timeout);
try (OutputStream bodyStream = connection.getOutputStream();
OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) {
- bodyWriter.write(getBusControllerSubscribeBody(config).toString());
+ bodyWriter.write(subscribeBody.toString());
}
subResponse = connection.getResponseCode();
subMessage = connection.getResponseMessage();
+ if (subResponse < 300) {
+ processResponse(connection);
+ }
} catch (IOException e) {
- logger.unwrap().error("Timeout Failure:", e);
+ logger.unwrap().error("Failure to process response", e);
+ processFailure = true;
}
logger.unwrap().info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage);
- if (subResponse >= 300 && attempts > 1) {
+ if ((subResponse >= 300 || processFailure) && attempts > 1 ) {
Thread.sleep(timeout);
- subscribe(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), config);
- } else if (subResponse >= 300) {
+ request(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), method, resource, subscribeBody);
+ } else if (subResponse >= 300 || processFailure) {
throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts");
}
}
@@ -209,4 +259,20 @@ public class DataRouterSubscriber implements HttpHandler {
logger.exiting();
}
}
+
+ @Override
+ public void reconfigure(MapperConfig config) throws ReconfigurationException {
+ logger.unwrap().info("Checking new Configuration against existing.");
+ if(!this.config.dmaapInfoEquals(config) || !this.config.getDmaapDRFeedId().equals(config.getDmaapDRFeedId())){
+ logger.unwrap().info("DMaaP Info changes found, reconfiguring.");
+ try {
+ this.config = config;
+ this.updateSubscriber();
+ } catch (TooManyTriesException | InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ReconfigurationException("Failed to reconfigure DataRouter subscriber.", e);
+ }
+ }
+
+ }
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java
new file mode 100644
index 0000000..66e3aee
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java
@@ -0,0 +1,30 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+public class ReconfigurationException extends Exception {
+ public ReconfigurationException(String message) {
+ super(message);
+ }
+ public ReconfigurationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
index 2f13080..d28d850 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
@@ -81,6 +81,15 @@ public class MapperConfig {
public String getSubscriberIdentity(){
return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
}
+
+ public boolean dmaapInfoEquals(MapperConfig mapperConfig){
+ return this
+ .getStreamsSubscribes()
+ .getDmaapSubscriber()
+ .getDmaapInfo()
+ .equals(mapperConfig.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo());
+ }
+
@Getter
@EqualsAndHashCode
private class StreamsSubscribes {
diff --git a/src/main/resources/Dockerfile b/src/main/resources/Dockerfile
index e2b6b89..2f4505c 100644
--- a/src/main/resources/Dockerfile
+++ b/src/main/resources/Dockerfile
@@ -19,7 +19,7 @@
#
FROM openjdk:8-jre-alpine
-ENTRYPOINT ["/usr/bin/java", "-jar", "/opt/app/pm-mapper/pm-mapper.jar"]
+ENTRYPOINT ["/bin/sh", "-c", "nohup sh etc/reconfigure.sh & /usr/bin/java -jar /opt/app/pm-mapper/pm-mapper.jar"]
ARG JAR
WORKDIR /opt/app/pm-mapper
@@ -32,5 +32,6 @@ RUN addgroup -S onap \
ADD target/${JAR} /opt/app/pm-mapper/pm-mapper.jar
ADD target/classes/mapping.ftl /opt/app/pm-mapper/etc/mapping.ftl
ADD target/classes/measCollec_plusString.xsd /opt/app/pm-mapper/etc/measCollec_plusString.xsd
+ADD target/classes/reconfigure.sh /opt/app/pm-mapper/etc/reconfigure.sh
USER pm-mapper
diff --git a/src/main/resources/reconfigure.sh b/src/main/resources/reconfigure.sh
new file mode 100644
index 0000000..ac6f940
--- /dev/null
+++ b/src/main/resources/reconfigure.sh
@@ -0,0 +1,6 @@
+#!/usr/bin/env sh
+while true
+do
+ sleep 60
+ echo $(wget -S --spider localhost:8081/reconfigure 2>&1) >> /var/log/ONAP/dcaegen2/services/pm-mapper/reconfigure.log
+done
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java
new file mode 100644
index 0000000..67510da
--- /dev/null
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java
@@ -0,0 +1,153 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.config;
+
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
+import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({DynamicConfiguration.class, EnvironmentConfig.class})
+public class DynamicConfigurationTest {
+ private static Path VALID_CONFIG_PATH = Paths.get("src/test/resources/valid_mapper_config.json");
+
+ private static ArrayList<Configurable> configurables;
+ private DynamicConfiguration objUnderTest;
+ private static String config;
+ private MapperConfig originalMapperConfig;
+
+ @Mock
+ private RequestSender sender;
+
+ @BeforeClass()
+ public static void setupBeforeClass() throws Exception {
+ config = new String(Files.readAllBytes(VALID_CONFIG_PATH));
+ }
+
+ @Before
+ public void setup() throws Exception {
+ configurables = new ArrayList<>();
+ PowerMockito.mockStatic(EnvironmentConfig.class);
+ PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
+ PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
+ PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
+
+ when(sender.send(any())).thenReturn(config);
+ ConfigHandler configHandler = new ConfigHandler(sender);
+ originalMapperConfig = configHandler.getMapperConfig();
+ objUnderTest = new DynamicConfiguration(configurables, originalMapperConfig);
+ }
+
+ @Test
+ public void testNoChangeResponse() throws Exception {
+ ConfigHandler configHandler = new ConfigHandler(sender);
+ originalMapperConfig = configHandler.getMapperConfig();
+ objUnderTest.setConfigHandler(configHandler);
+
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+ objUnderTest.handleRequest(httpServerExchange);
+ assertEquals(originalMapperConfig, objUnderTest.getOriginalConfig());
+ verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.OK);
+ }
+
+ @Test
+ public void testApplyOriginalUponFailure() throws Exception {
+ ConfigHandler configHandler = new ConfigHandler(sender);
+ Configurable configurable = mock(Configurable.class);
+ configurables.add(configurable);
+ JsonObject modifiedConfig = new JsonParser().parse(config).getAsJsonObject();
+ modifiedConfig.addProperty("dmaap_dr_feed_id","3");
+ when(sender.send(any())).thenReturn(modifiedConfig.toString());
+ MapperConfig modifiedMapperConfig = configHandler.getMapperConfig();
+
+ objUnderTest.setConfigHandler(configHandler);
+
+ doAnswer(new Answer() {
+ boolean failFirstReconfigure = true;
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ if (failFirstReconfigure) {
+ failFirstReconfigure = false;
+ throw new ReconfigurationException("");
+ }
+ return null;
+ }
+ }).when(configurable).reconfigure(any());
+
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+ objUnderTest.handleRequest(httpServerExchange);
+ assertEquals(originalMapperConfig, objUnderTest.getOriginalConfig());
+ verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.INTERNAL_SERVER_ERROR);
+ verify(configurable, times(1)).reconfigure(modifiedMapperConfig);
+ verify(configurable, times(1)).reconfigure(originalMapperConfig);
+ }
+
+ @Test
+ public void testSuccessfulReconfiguration() throws Exception {
+ ConfigHandler configHandler = new ConfigHandler(sender);
+ Configurable configurable = mock(Configurable.class);
+ configurables.add(configurable);
+ JsonObject modifiedConfig = new JsonParser().parse(config).getAsJsonObject();
+ modifiedConfig.addProperty("dmaap_dr_feed_id","3");
+
+ when(sender.send(any())).thenReturn(modifiedConfig.toString());
+ MapperConfig modifiedMapperConfig = configHandler.getMapperConfig();
+ objUnderTest.setConfigHandler(configHandler);
+
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+ objUnderTest.handleRequest(httpServerExchange);
+ assertEquals(modifiedMapperConfig, objUnderTest.getOriginalConfig());
+ verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.OK);
+ verify(configurable, times(1)).reconfigure(modifiedMapperConfig);
+
+ }
+}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
index fdc1bf6..dbb95a7 100644
--- a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
@@ -43,10 +43,15 @@ import io.undertow.server.HttpServerExchange;
import io.undertow.util.HeaderMap;
import io.undertow.util.StatusCodes;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import org.junit.Before;
import org.junit.Test;
@@ -55,28 +60,38 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
+import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
+import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import utils.LoggingUtils;
@RunWith(PowerMockRunner.class)
-@PrepareForTest(DataRouterSubscriber.class)
+@PrepareForTest({DataRouterSubscriber.class, EnvironmentConfig.class})
public class DataRouterSubscriberTest {
+ private Path VALID_BC_RESPONSE_PATH = Paths.get("src/test/resources/datarouter_subscriber_test/valid_bc_response.json");
+ private Path VALID_METADATA_PATH = Paths.get("src/test/resources/valid_metadata.json");
+ private Path INVALID_METADATA_PATH = Paths.get("src/test/resources/invalid_metadata.json");
+ private Path VALID_CONFIG_PATH = Paths.get("src/test/resources/valid_mapper_config.json");
@Mock
private EventReceiver eventReceiver;
+ @Mock
+ private MapperConfig config;
private DataRouterSubscriber objUnderTest;
@Before
public void setUp() {
- objUnderTest = new DataRouterSubscriber(eventReceiver);
+ objUnderTest = new DataRouterSubscriber(eventReceiver, config);
}
@Test
@@ -84,34 +99,37 @@ public class DataRouterSubscriberTest {
PowerMockito.mockStatic(Thread.class);
URL subEndpoint = mock(URL.class);
- MapperConfig config = mock(MapperConfig.class);
when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
when(subEndpoint.openConnection()).thenReturn(huc);
when(huc.getResponseCode()).thenReturn(300);
- Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config));
+ Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start());
}
@Test
public void testStartImmediateSuccess() throws IOException, TooManyTriesException, InterruptedException {
URL subEndpoint = mock(URL.class);
- MapperConfig config = mock(MapperConfig.class);
when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
+ InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
+ when(huc.getInputStream()).thenReturn(responseStream);
when(subEndpoint.openConnection()).thenReturn(huc);
when(huc.getResponseCode()).thenReturn(200);
- objUnderTest.start(config);
+ objUnderTest.start();
verify(huc, times(1)).getResponseCode();
}
@Test
public void testStartDelayedSuccess() throws IOException, TooManyTriesException, InterruptedException {
PowerMockito.mockStatic(Thread.class);
-
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
+ InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
+ when(huc.getInputStream()).thenReturn(responseStream);
URL subEndpoint = mock(URL.class);
- MapperConfig config = mock(MapperConfig.class);
when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
- HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+
when(subEndpoint.openConnection()).thenReturn(huc);
doAnswer(new Answer() {
boolean forceRetry = true;
@@ -125,7 +143,7 @@ public class DataRouterSubscriberTest {
return 200;
}
}).when(huc).getResponseCode();
- objUnderTest.start(config);
+ objUnderTest.start();
verify(huc, times(2)).getResponseCode();
}
@@ -134,12 +152,11 @@ public class DataRouterSubscriberTest {
PowerMockito.mockStatic(Thread.class);
URL subEndpoint = mock(URL.class);
- MapperConfig config = mock(MapperConfig.class);
when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
when(subEndpoint.openConnection()).thenReturn(huc);
doThrow(new IOException()).when(huc).getResponseCode();
- Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config));
+ Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start());
}
@Test
@@ -171,10 +188,29 @@ public class DataRouterSubscriberTest {
}
@Test
+ public void testStartPositiveResponseCodeInvalidResponseBody() throws Exception{
+ PowerMockito.mockStatic(EnvironmentConfig.class);
+ PowerMockito.mockStatic(Thread.class);
+ PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
+ PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
+ PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
+
+ URL mockURL = mock(URL.class);
+ when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL);
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ String bcResponse = "not a valid response";
+ InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
+ when(huc.getInputStream()).thenReturn(responseStream);
+ when(mockURL.openConnection()).thenReturn(huc);
+ when(huc.getResponseCode()).thenReturn(200);
+ Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start());
+ }
+
+ @Test
public void testRequestInboundInvalidMetadata() throws Exception {
HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
JsonObject metadata = new JsonParser().parse(new String(Files
- .readAllBytes(Paths.get("src/test/resources/invalid_metadata.json")))).getAsJsonObject();
+ .readAllBytes(INVALID_METADATA_PATH))).getAsJsonObject();
when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt()))
.thenReturn(metadata.toString());
when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
@@ -218,7 +254,7 @@ public class DataRouterSubscriberTest {
when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
String testString = "MESSAGE BODY";
JsonObject metadata = new JsonParser().parse(
- new String(Files.readAllBytes(Paths.get("src/test/resources/valid_metadata.json")))).getAsJsonObject();
+ new String(Files.readAllBytes(VALID_METADATA_PATH))).getAsJsonObject();
when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.METADATA_HEADER).get(anyInt()))
.thenReturn(metadata.toString());
when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.PUB_ID_HEADER).getFirst()).thenReturn("");
@@ -244,4 +280,108 @@ public class DataRouterSubscriberTest {
logAppender.stop();
}
+ @Test
+ public void testConfigThrowsMalformedURLException() throws MalformedURLException {
+ when(config.getBusControllerSubscriptionUrl()).thenThrow(MalformedURLException.class);
+ Assertions.assertThrows(IllegalStateException.class, () -> objUnderTest.start());
+ }
+ @Test
+ public void testReconfigurationSameConfig() throws Exception {
+ PowerMockito.mockStatic(EnvironmentConfig.class);
+ PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
+ PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
+ PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
+
+ RequestSender sender = mock(RequestSender.class);
+ String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH));
+ when(sender.send(any())).thenReturn(mapperConfig);
+ MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig();
+
+ DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig);
+ objUnderTest.reconfigure(originalMapperConfig);
+ assertEquals(originalMapperConfig, objUnderTest.getConfig());
+ }
+
+ @Test
+ public void testReconfigurationModifiedFeedId() throws Exception {
+ PowerMockito.mockStatic(EnvironmentConfig.class);
+ PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
+ PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
+ PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
+
+ URL mockURL = mock(URL.class);
+ when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL);
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
+ InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
+ when(huc.getInputStream()).thenReturn(responseStream);
+ when(mockURL.openConnection()).thenReturn(huc);
+ when(huc.getResponseCode()).thenReturn(200);
+
+ PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
+
+ RequestSender sender = mock(RequestSender.class);
+ String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH));
+ when(sender.send(any())).thenReturn(mapperConfig);
+ MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig();
+ JsonObject modifiedMapperConfigObj = new JsonParser().parse(mapperConfig).getAsJsonObject();
+ modifiedMapperConfigObj.addProperty("dmaap_dr_feed_id", "3");
+ when(sender.send(any())).thenReturn(modifiedMapperConfigObj.toString());
+ MapperConfig modifiedMapperConfig = new ConfigHandler(sender).getMapperConfig();
+
+ DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig);
+ objUnderTest.reconfigure(modifiedMapperConfig);
+ assertEquals(modifiedMapperConfig, objUnderTest.getConfig());
+ }
+
+ @Test
+ public void testReconfigurationModifiedUsername() throws Exception {
+ PowerMockito.mockStatic(EnvironmentConfig.class);
+ PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
+ PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
+ PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
+
+ URL mockURL = mock(URL.class);
+ when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL);
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
+ InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
+ when(huc.getInputStream()).thenReturn(responseStream);
+ when(mockURL.openConnection()).thenReturn(huc);
+ when(huc.getResponseCode()).thenReturn(200);
+
+ PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
+
+ RequestSender sender = mock(RequestSender.class);
+ String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH));
+ when(sender.send(any())).thenReturn(mapperConfig);
+ MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig();
+ JsonObject modifiedMapperConfigObj = new JsonParser().parse(mapperConfig).getAsJsonObject();
+ modifiedMapperConfigObj.get("streams_subscribes")
+ .getAsJsonObject().get("dmaap_subscriber")
+ .getAsJsonObject().get("dmaap_info")
+ .getAsJsonObject()
+ .addProperty("username", "bob");
+ when(sender.send(any())).thenReturn(modifiedMapperConfigObj.toString());
+ MapperConfig modifiedMapperConfig = new ConfigHandler(sender).getMapperConfig();
+
+ DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig);
+ objUnderTest.reconfigure(modifiedMapperConfig);
+ assertEquals(modifiedMapperConfig, objUnderTest.getConfig());
+ }
+
+ @Test
+ public void testReconfigurationMalformedURL() throws Exception {
+ when(config.getBusControllerSubscriptionUrl()).thenThrow(MalformedURLException.class);
+ Assertions.assertThrows(IllegalStateException.class, () -> objUnderTest.reconfigure(config));
+ }
+ @Test
+ public void testReconfigurationException() throws Exception {
+ PowerMockito.mockStatic(Thread.class);
+ URL url = mock(URL.class);
+ when(url.toString()).thenReturn("http://valid:8080/");
+ when(url.openConnection()).thenThrow(IOException.class);
+ when(config.getBusControllerSubscriptionUrl()).thenReturn(url);
+ Assertions.assertThrows(ReconfigurationException.class, () -> objUnderTest.reconfigure(config));
+ }
}
diff --git a/src/test/resources/datarouter_subscriber_test/valid_bc_response.json b/src/test/resources/datarouter_subscriber_test/valid_bc_response.json
new file mode 100644
index 0000000..201b786
--- /dev/null
+++ b/src/test/resources/datarouter_subscriber_test/valid_bc_response.json
@@ -0,0 +1,15 @@
+{
+ "type": "dr_Sub",
+ "lastMod": "2019-03-11T14:29:39.659",
+ "status": "VALID",
+ "dcaeLocationName": "location",
+ "deliveryURL": "delivery_url",
+ "feedId": "2",
+ "logURL": "https://dmaap-dr-prov/sublog/2",
+ "owner": "DGL",
+ "subId": "1",
+ "suspended": false,
+ "use100": false,
+ "username": "username",
+ "userpwd": "password"
+} \ No newline at end of file