diff options
Diffstat (limited to 'src')
11 files changed, 586 insertions, 40 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 09d8975..6ebc61c 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -26,6 +26,8 @@ import io.undertow.util.StatusCodes; import lombok.NonNull; import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler; +import org.onap.dcaegen2.services.pmmapper.config.Configurable; +import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration; import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber; import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError; @@ -47,6 +49,7 @@ import reactor.core.scheduler.Schedulers; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; public class App { private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class)); @@ -57,9 +60,7 @@ public class App { public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException { Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink); HealthCheckHandler healthCheckHandler = new HealthCheckHandler(); - MapperConfig mapperConfig = new ConfigHandler().getMapperConfig(); - MetadataFilter metadataFilter = new MetadataFilter(mapperConfig); Mapper mapper = new Mapper(mappingTemplate); XMLValidator validator = new XMLValidator(xmlSchema); @@ -73,14 +74,18 @@ public class App { .filter(validator::validate) .map(mapper::map) .subscribe(event -> logger.unwrap().info("Event Processed")); - - DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next); - dataRouterSubscriber.start(mapperConfig); + DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig); + dataRouterSubscriber.start(); + ArrayList<Configurable> configurables = new ArrayList<>(); + configurables.add(dataRouterSubscriber); + DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(configurables, mapperConfig); Undertow.builder() .addHttpListener(8081, "0.0.0.0") - .setHandler(Handlers.routing().add("put", "/delivery/{filename}", dataRouterSubscriber) - .add("get", "/healthcheck", healthCheckHandler)) + .setHandler(Handlers.routing() + .add("put", "/delivery/{filename}", dataRouterSubscriber) + .add("get", "/healthcheck", healthCheckHandler) + .add("get", "/reconfigure", dynamicConfiguration)) .build().start(); } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java new file mode 100644 index 0000000..ac2fe57 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java @@ -0,0 +1,31 @@ +/* + * - + * * ============LICENSE_START======================================================= + * * Copyright (C) 2019 Nordix Foundation. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * * SPDX-License-Identifier: Apache-2.0 + * * ============LICENSE_END========================================================= + * + */ + +package org.onap.dcaegen2.services.pmmapper.config; + +import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; + +public interface Configurable { + + void reconfigure(MapperConfig mapperConfig) throws ReconfigurationException; +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java new file mode 100644 index 0000000..37fa8b5 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java @@ -0,0 +1,90 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.config; + +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.StatusCodes; +import lombok.Data; +import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter; +import org.onap.logging.ref.slf4j.ONAPLogAdapter; +import org.slf4j.LoggerFactory; + +import java.util.List; + +@Data +public class DynamicConfiguration implements HttpHandler{ + private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DynamicConfiguration.class)); + private List<Configurable> configurables; + private MapperConfig originalConfig; + private ConfigHandler configHandler; + + /** + * Creates a Dynamic Configuration object with a list of configurable objects. + * @param configurables list of objects to reconfigure + * @param originalConfig original config to compare against. + */ + public DynamicConfiguration(List<Configurable> configurables, MapperConfig originalConfig){ + this.configurables = configurables; + this.originalConfig = originalConfig; + this.configHandler = new ConfigHandler(); + } + + private void applyConfiguration(MapperConfig updatedConfig) throws ReconfigurationException { + for (Configurable configurable : configurables) { + logger.unwrap().debug("Reconfiguring: {}", configurable); + configurable.reconfigure(updatedConfig); + } + } + + /** + * Receives requests to pull the latest configuration from CBS. + * @param httpServerExchange inbound http server exchange. + * @throws Exception + */ + @Override + public void handleRequest(HttpServerExchange httpServerExchange) throws Exception { + try { + logger.entering(new HttpServerExchangeAdapter(httpServerExchange)); + MapperConfig config = configHandler.getMapperConfig(); + int responseCode = StatusCodes.OK; + String responseMessage = StatusCodes.OK_STRING; + + if (!this.originalConfig.equals(config)) { + logger.unwrap().info("Configuration update detected."); + logger.unwrap().info("Reconfiguring configurables"); + try { + applyConfiguration(config); + this.originalConfig = config; + } catch (ReconfigurationException e) { + responseCode = StatusCodes.INTERNAL_SERVER_ERROR; + responseMessage = StatusCodes.INTERNAL_SERVER_ERROR_STRING; + applyConfiguration(this.originalConfig); + } + } + httpServerExchange.setStatusCode(responseCode).getResponseSender().send(responseMessage); + } finally { + logger.exiting(); + } + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java index 2f2ab4d..f37bcd3 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java @@ -24,11 +24,15 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; import io.undertow.util.HeaderValues; import lombok.Data; import lombok.NonNull; +import org.onap.dcaegen2.services.pmmapper.config.Configurable; import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException; +import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException; import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; import org.onap.dcaegen2.services.pmmapper.model.EventMetadata; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; @@ -44,23 +48,28 @@ import org.onap.logging.ref.slf4j.ONAPLogConstants; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.UUID; +import java.util.stream.Collectors; /** * Subscriber for events sent from data router * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to. */ @Data -public class DataRouterSubscriber implements HttpHandler { +public class DataRouterSubscriber implements HttpHandler, Configurable { public static final String METADATA_HEADER = "X-DMAAP-DR-META"; public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID"; @@ -75,40 +84,41 @@ public class DataRouterSubscriber implements HttpHandler { private boolean limited = false; private Random jitterGenerator; private Gson metadataBuilder; + private MapperConfig config; + private String subscriberId; @NonNull private EventReceiver eventReceiver; /** * @param eventReceiver receiver for any inbound events. */ - public DataRouterSubscriber(EventReceiver eventReceiver) { + public DataRouterSubscriber(EventReceiver eventReceiver, MapperConfig config) { this.eventReceiver = eventReceiver; this.jitterGenerator = new Random(); this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>()) .create(); + this.config = config; + this.subscriberId=""; } /** * Starts data flow by subscribing to data router through bus controller. * - * @param config configuration object containing bus controller endpoint for subscription and - * all non constant configuration for subscription through this endpoint. * @throws TooManyTriesException in the event that timeout has occurred several times. */ - public void start(MapperConfig config) throws TooManyTriesException, InterruptedException { + public void start() throws TooManyTriesException, InterruptedException { try { logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY); - subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config); + subscribe(); logger.unwrap().info("Successfully started DR Subscriber"); } finally { logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT); } } - private HttpURLConnection getBusControllerConnection(MapperConfig config, int timeout) throws IOException { - HttpURLConnection connection = (HttpURLConnection) config.getBusControllerSubscriptionUrl() - .openConnection(); - connection.setRequestMethod("POST"); + private HttpURLConnection getBusControllerConnection(String method, URL resource, int timeout) throws IOException { + HttpURLConnection connection = (HttpURLConnection) resource.openConnection(); + connection.setRequestMethod(method); connection.setConnectTimeout(timeout); connection.setReadTimeout(timeout); connection.setRequestProperty("Content-Type", "application/json"); @@ -135,26 +145,66 @@ public class DataRouterSubscriber implements HttpHandler { return subscriberObj; } - private void subscribe(int attempts, int timeout, MapperConfig config) throws TooManyTriesException, InterruptedException { + private void processResponse(HttpURLConnection connection) throws IOException { + try (BufferedReader responseBody = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + String body = responseBody.lines().collect(Collectors.joining("")); + updateSubscriberId(body); + } catch (IOException | JsonSyntaxException | IllegalStateException e) { + throw new IOException("Failed to process response", e); + } + } + + private void updateSubscriberId(String responseBody) { + JsonParser parser = new JsonParser(); + JsonObject responseObject = parser.parse(responseBody).getAsJsonObject(); + this.subscriberId = responseObject.get("subId").getAsString(); + } + + private void subscribe() throws TooManyTriesException, InterruptedException { + try { + URL subscribeResource = this.config.getBusControllerSubscriptionUrl(); + JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config); + request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "POST", subscribeResource, subscribeBody); + } catch (MalformedURLException e) { + throw new IllegalStateException("Subscription URL is malformed", e); + } + + } + private void updateSubscriber() throws TooManyTriesException, InterruptedException { + try { + URL subscribeResource = this.config.getBusControllerSubscriptionUrl(); + URL updateResource = new URL(String.format("%s/%s", subscribeResource, subscriberId)); + JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config); + request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "PUT", updateResource, subscribeBody); + } catch (MalformedURLException e) { + throw new IllegalStateException("Subscription URL is malformed", e); + } + } + + private void request(int attempts, int timeout, String method, URL resource, JsonObject subscribeBody) throws TooManyTriesException, InterruptedException { int subResponse = 504; String subMessage = ""; + boolean processFailure = false; try { - HttpURLConnection connection = getBusControllerConnection(config, timeout); - + HttpURLConnection connection = getBusControllerConnection(method, resource, timeout); try (OutputStream bodyStream = connection.getOutputStream(); OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) { - bodyWriter.write(getBusControllerSubscribeBody(config).toString()); + bodyWriter.write(subscribeBody.toString()); } subResponse = connection.getResponseCode(); subMessage = connection.getResponseMessage(); + if (subResponse < 300) { + processResponse(connection); + } } catch (IOException e) { - logger.unwrap().error("Timeout Failure:", e); + logger.unwrap().error("Failure to process response", e); + processFailure = true; } logger.unwrap().info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage); - if (subResponse >= 300 && attempts > 1) { + if ((subResponse >= 300 || processFailure) && attempts > 1 ) { Thread.sleep(timeout); - subscribe(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), config); - } else if (subResponse >= 300) { + request(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), method, resource, subscribeBody); + } else if (subResponse >= 300 || processFailure) { throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts"); } } @@ -209,4 +259,20 @@ public class DataRouterSubscriber implements HttpHandler { logger.exiting(); } } + + @Override + public void reconfigure(MapperConfig config) throws ReconfigurationException { + logger.unwrap().info("Checking new Configuration against existing."); + if(!this.config.dmaapInfoEquals(config) || !this.config.getDmaapDRFeedId().equals(config.getDmaapDRFeedId())){ + logger.unwrap().info("DMaaP Info changes found, reconfiguring."); + try { + this.config = config; + this.updateSubscriber(); + } catch (TooManyTriesException | InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ReconfigurationException("Failed to reconfigure DataRouter subscriber.", e); + } + } + + } } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java new file mode 100644 index 0000000..66e3aee --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java @@ -0,0 +1,30 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.exceptions; + +public class ReconfigurationException extends Exception { + public ReconfigurationException(String message) { + super(message); + } + public ReconfigurationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java index 2f13080..d28d850 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java @@ -81,6 +81,15 @@ public class MapperConfig { public String getSubscriberIdentity(){
return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
}
+
+ public boolean dmaapInfoEquals(MapperConfig mapperConfig){
+ return this
+ .getStreamsSubscribes()
+ .getDmaapSubscriber()
+ .getDmaapInfo()
+ .equals(mapperConfig.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo());
+ }
+
@Getter
@EqualsAndHashCode
private class StreamsSubscribes {
diff --git a/src/main/resources/Dockerfile b/src/main/resources/Dockerfile index e2b6b89..2f4505c 100644 --- a/src/main/resources/Dockerfile +++ b/src/main/resources/Dockerfile @@ -19,7 +19,7 @@ # FROM openjdk:8-jre-alpine -ENTRYPOINT ["/usr/bin/java", "-jar", "/opt/app/pm-mapper/pm-mapper.jar"] +ENTRYPOINT ["/bin/sh", "-c", "nohup sh etc/reconfigure.sh & /usr/bin/java -jar /opt/app/pm-mapper/pm-mapper.jar"] ARG JAR WORKDIR /opt/app/pm-mapper @@ -32,5 +32,6 @@ RUN addgroup -S onap \ ADD target/${JAR} /opt/app/pm-mapper/pm-mapper.jar ADD target/classes/mapping.ftl /opt/app/pm-mapper/etc/mapping.ftl ADD target/classes/measCollec_plusString.xsd /opt/app/pm-mapper/etc/measCollec_plusString.xsd +ADD target/classes/reconfigure.sh /opt/app/pm-mapper/etc/reconfigure.sh USER pm-mapper diff --git a/src/main/resources/reconfigure.sh b/src/main/resources/reconfigure.sh new file mode 100644 index 0000000..ac6f940 --- /dev/null +++ b/src/main/resources/reconfigure.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env sh +while true +do + sleep 60 + echo $(wget -S --spider localhost:8081/reconfigure 2>&1) >> /var/log/ONAP/dcaegen2/services/pm-mapper/reconfigure.log +done diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java new file mode 100644 index 0000000..67510da --- /dev/null +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java @@ -0,0 +1,153 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.config; + + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.StatusCodes; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException; +import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.onap.dcaegen2.services.pmmapper.utils.RequestSender; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({DynamicConfiguration.class, EnvironmentConfig.class}) +public class DynamicConfigurationTest { + private static Path VALID_CONFIG_PATH = Paths.get("src/test/resources/valid_mapper_config.json"); + + private static ArrayList<Configurable> configurables; + private DynamicConfiguration objUnderTest; + private static String config; + private MapperConfig originalMapperConfig; + + @Mock + private RequestSender sender; + + @BeforeClass() + public static void setupBeforeClass() throws Exception { + config = new String(Files.readAllBytes(VALID_CONFIG_PATH)); + } + + @Before + public void setup() throws Exception { + configurables = new ArrayList<>(); + PowerMockito.mockStatic(EnvironmentConfig.class); + PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn(""); + PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1); + PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn(""); + + when(sender.send(any())).thenReturn(config); + ConfigHandler configHandler = new ConfigHandler(sender); + originalMapperConfig = configHandler.getMapperConfig(); + objUnderTest = new DynamicConfiguration(configurables, originalMapperConfig); + } + + @Test + public void testNoChangeResponse() throws Exception { + ConfigHandler configHandler = new ConfigHandler(sender); + originalMapperConfig = configHandler.getMapperConfig(); + objUnderTest.setConfigHandler(configHandler); + + HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS); + objUnderTest.handleRequest(httpServerExchange); + assertEquals(originalMapperConfig, objUnderTest.getOriginalConfig()); + verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.OK); + } + + @Test + public void testApplyOriginalUponFailure() throws Exception { + ConfigHandler configHandler = new ConfigHandler(sender); + Configurable configurable = mock(Configurable.class); + configurables.add(configurable); + JsonObject modifiedConfig = new JsonParser().parse(config).getAsJsonObject(); + modifiedConfig.addProperty("dmaap_dr_feed_id","3"); + when(sender.send(any())).thenReturn(modifiedConfig.toString()); + MapperConfig modifiedMapperConfig = configHandler.getMapperConfig(); + + objUnderTest.setConfigHandler(configHandler); + + doAnswer(new Answer() { + boolean failFirstReconfigure = true; + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + if (failFirstReconfigure) { + failFirstReconfigure = false; + throw new ReconfigurationException(""); + } + return null; + } + }).when(configurable).reconfigure(any()); + + HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS); + objUnderTest.handleRequest(httpServerExchange); + assertEquals(originalMapperConfig, objUnderTest.getOriginalConfig()); + verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.INTERNAL_SERVER_ERROR); + verify(configurable, times(1)).reconfigure(modifiedMapperConfig); + verify(configurable, times(1)).reconfigure(originalMapperConfig); + } + + @Test + public void testSuccessfulReconfiguration() throws Exception { + ConfigHandler configHandler = new ConfigHandler(sender); + Configurable configurable = mock(Configurable.class); + configurables.add(configurable); + JsonObject modifiedConfig = new JsonParser().parse(config).getAsJsonObject(); + modifiedConfig.addProperty("dmaap_dr_feed_id","3"); + + when(sender.send(any())).thenReturn(modifiedConfig.toString()); + MapperConfig modifiedMapperConfig = configHandler.getMapperConfig(); + objUnderTest.setConfigHandler(configHandler); + + HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS); + objUnderTest.handleRequest(httpServerExchange); + assertEquals(modifiedMapperConfig, objUnderTest.getOriginalConfig()); + verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.OK); + verify(configurable, times(1)).reconfigure(modifiedMapperConfig); + + } +} diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java index fdc1bf6..dbb95a7 100644 --- a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java @@ -43,10 +43,15 @@ import io.undertow.server.HttpServerExchange; import io.undertow.util.HeaderMap; import io.undertow.util.StatusCodes; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.HttpURLConnection; +import java.net.MalformedURLException; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import org.junit.Before; import org.junit.Test; @@ -55,28 +60,38 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler; +import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException; import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; +import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig; import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter; +import org.onap.dcaegen2.services.pmmapper.utils.RequestSender; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import utils.LoggingUtils; @RunWith(PowerMockRunner.class) -@PrepareForTest(DataRouterSubscriber.class) +@PrepareForTest({DataRouterSubscriber.class, EnvironmentConfig.class}) public class DataRouterSubscriberTest { + private Path VALID_BC_RESPONSE_PATH = Paths.get("src/test/resources/datarouter_subscriber_test/valid_bc_response.json"); + private Path VALID_METADATA_PATH = Paths.get("src/test/resources/valid_metadata.json"); + private Path INVALID_METADATA_PATH = Paths.get("src/test/resources/invalid_metadata.json"); + private Path VALID_CONFIG_PATH = Paths.get("src/test/resources/valid_mapper_config.json"); @Mock private EventReceiver eventReceiver; + @Mock + private MapperConfig config; private DataRouterSubscriber objUnderTest; @Before public void setUp() { - objUnderTest = new DataRouterSubscriber(eventReceiver); + objUnderTest = new DataRouterSubscriber(eventReceiver, config); } @Test @@ -84,34 +99,37 @@ public class DataRouterSubscriberTest { PowerMockito.mockStatic(Thread.class); URL subEndpoint = mock(URL.class); - MapperConfig config = mock(MapperConfig.class); when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint); HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); when(subEndpoint.openConnection()).thenReturn(huc); when(huc.getResponseCode()).thenReturn(300); - Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config)); + Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start()); } @Test public void testStartImmediateSuccess() throws IOException, TooManyTriesException, InterruptedException { URL subEndpoint = mock(URL.class); - MapperConfig config = mock(MapperConfig.class); when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint); HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH)); + InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8)); + when(huc.getInputStream()).thenReturn(responseStream); when(subEndpoint.openConnection()).thenReturn(huc); when(huc.getResponseCode()).thenReturn(200); - objUnderTest.start(config); + objUnderTest.start(); verify(huc, times(1)).getResponseCode(); } @Test public void testStartDelayedSuccess() throws IOException, TooManyTriesException, InterruptedException { PowerMockito.mockStatic(Thread.class); - + HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH)); + InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8)); + when(huc.getInputStream()).thenReturn(responseStream); URL subEndpoint = mock(URL.class); - MapperConfig config = mock(MapperConfig.class); when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint); - HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + when(subEndpoint.openConnection()).thenReturn(huc); doAnswer(new Answer() { boolean forceRetry = true; @@ -125,7 +143,7 @@ public class DataRouterSubscriberTest { return 200; } }).when(huc).getResponseCode(); - objUnderTest.start(config); + objUnderTest.start(); verify(huc, times(2)).getResponseCode(); } @@ -134,12 +152,11 @@ public class DataRouterSubscriberTest { PowerMockito.mockStatic(Thread.class); URL subEndpoint = mock(URL.class); - MapperConfig config = mock(MapperConfig.class); when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint); HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); when(subEndpoint.openConnection()).thenReturn(huc); doThrow(new IOException()).when(huc).getResponseCode(); - Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config)); + Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start()); } @Test @@ -171,10 +188,29 @@ public class DataRouterSubscriberTest { } @Test + public void testStartPositiveResponseCodeInvalidResponseBody() throws Exception{ + PowerMockito.mockStatic(EnvironmentConfig.class); + PowerMockito.mockStatic(Thread.class); + PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn(""); + PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1); + PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn(""); + + URL mockURL = mock(URL.class); + when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL); + HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + String bcResponse = "not a valid response"; + InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8)); + when(huc.getInputStream()).thenReturn(responseStream); + when(mockURL.openConnection()).thenReturn(huc); + when(huc.getResponseCode()).thenReturn(200); + Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start()); + } + + @Test public void testRequestInboundInvalidMetadata() throws Exception { HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS); JsonObject metadata = new JsonParser().parse(new String(Files - .readAllBytes(Paths.get("src/test/resources/invalid_metadata.json")))).getAsJsonObject(); + .readAllBytes(INVALID_METADATA_PATH))).getAsJsonObject(); when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt())) .thenReturn(metadata.toString()); when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange); @@ -218,7 +254,7 @@ public class DataRouterSubscriberTest { when(httpServerExchange.getRequestReceiver()).thenReturn(receiver); String testString = "MESSAGE BODY"; JsonObject metadata = new JsonParser().parse( - new String(Files.readAllBytes(Paths.get("src/test/resources/valid_metadata.json")))).getAsJsonObject(); + new String(Files.readAllBytes(VALID_METADATA_PATH))).getAsJsonObject(); when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.METADATA_HEADER).get(anyInt())) .thenReturn(metadata.toString()); when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.PUB_ID_HEADER).getFirst()).thenReturn(""); @@ -244,4 +280,108 @@ public class DataRouterSubscriberTest { logAppender.stop(); } + @Test + public void testConfigThrowsMalformedURLException() throws MalformedURLException { + when(config.getBusControllerSubscriptionUrl()).thenThrow(MalformedURLException.class); + Assertions.assertThrows(IllegalStateException.class, () -> objUnderTest.start()); + } + @Test + public void testReconfigurationSameConfig() throws Exception { + PowerMockito.mockStatic(EnvironmentConfig.class); + PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn(""); + PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1); + PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn(""); + + RequestSender sender = mock(RequestSender.class); + String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH)); + when(sender.send(any())).thenReturn(mapperConfig); + MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig(); + + DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig); + objUnderTest.reconfigure(originalMapperConfig); + assertEquals(originalMapperConfig, objUnderTest.getConfig()); + } + + @Test + public void testReconfigurationModifiedFeedId() throws Exception { + PowerMockito.mockStatic(EnvironmentConfig.class); + PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn(""); + PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1); + PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn(""); + + URL mockURL = mock(URL.class); + when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL); + HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH)); + InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8)); + when(huc.getInputStream()).thenReturn(responseStream); + when(mockURL.openConnection()).thenReturn(huc); + when(huc.getResponseCode()).thenReturn(200); + + PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL); + + RequestSender sender = mock(RequestSender.class); + String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH)); + when(sender.send(any())).thenReturn(mapperConfig); + MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig(); + JsonObject modifiedMapperConfigObj = new JsonParser().parse(mapperConfig).getAsJsonObject(); + modifiedMapperConfigObj.addProperty("dmaap_dr_feed_id", "3"); + when(sender.send(any())).thenReturn(modifiedMapperConfigObj.toString()); + MapperConfig modifiedMapperConfig = new ConfigHandler(sender).getMapperConfig(); + + DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig); + objUnderTest.reconfigure(modifiedMapperConfig); + assertEquals(modifiedMapperConfig, objUnderTest.getConfig()); + } + + @Test + public void testReconfigurationModifiedUsername() throws Exception { + PowerMockito.mockStatic(EnvironmentConfig.class); + PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn(""); + PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1); + PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn(""); + + URL mockURL = mock(URL.class); + when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL); + HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH)); + InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8)); + when(huc.getInputStream()).thenReturn(responseStream); + when(mockURL.openConnection()).thenReturn(huc); + when(huc.getResponseCode()).thenReturn(200); + + PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL); + + RequestSender sender = mock(RequestSender.class); + String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH)); + when(sender.send(any())).thenReturn(mapperConfig); + MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig(); + JsonObject modifiedMapperConfigObj = new JsonParser().parse(mapperConfig).getAsJsonObject(); + modifiedMapperConfigObj.get("streams_subscribes") + .getAsJsonObject().get("dmaap_subscriber") + .getAsJsonObject().get("dmaap_info") + .getAsJsonObject() + .addProperty("username", "bob"); + when(sender.send(any())).thenReturn(modifiedMapperConfigObj.toString()); + MapperConfig modifiedMapperConfig = new ConfigHandler(sender).getMapperConfig(); + + DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig); + objUnderTest.reconfigure(modifiedMapperConfig); + assertEquals(modifiedMapperConfig, objUnderTest.getConfig()); + } + + @Test + public void testReconfigurationMalformedURL() throws Exception { + when(config.getBusControllerSubscriptionUrl()).thenThrow(MalformedURLException.class); + Assertions.assertThrows(IllegalStateException.class, () -> objUnderTest.reconfigure(config)); + } + @Test + public void testReconfigurationException() throws Exception { + PowerMockito.mockStatic(Thread.class); + URL url = mock(URL.class); + when(url.toString()).thenReturn("http://valid:8080/"); + when(url.openConnection()).thenThrow(IOException.class); + when(config.getBusControllerSubscriptionUrl()).thenReturn(url); + Assertions.assertThrows(ReconfigurationException.class, () -> objUnderTest.reconfigure(config)); + } } diff --git a/src/test/resources/datarouter_subscriber_test/valid_bc_response.json b/src/test/resources/datarouter_subscriber_test/valid_bc_response.json new file mode 100644 index 0000000..201b786 --- /dev/null +++ b/src/test/resources/datarouter_subscriber_test/valid_bc_response.json @@ -0,0 +1,15 @@ +{ + "type": "dr_Sub", + "lastMod": "2019-03-11T14:29:39.659", + "status": "VALID", + "dcaeLocationName": "location", + "deliveryURL": "delivery_url", + "feedId": "2", + "logURL": "https://dmaap-dr-prov/sublog/2", + "owner": "DGL", + "subId": "1", + "suspended": false, + "use100": false, + "username": "username", + "userpwd": "password" +}
\ No newline at end of file |