summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java19
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java31
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java90
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java102
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java30
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java9
6 files changed, 256 insertions, 25 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 09d8975..6ebc61c 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -26,6 +26,8 @@ import io.undertow.util.StatusCodes;
import lombok.NonNull;
import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
+import org.onap.dcaegen2.services.pmmapper.config.Configurable;
+import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration;
import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
@@ -47,6 +49,7 @@ import reactor.core.scheduler.Schedulers;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
public class App {
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class));
@@ -57,9 +60,7 @@ public class App {
public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException {
Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink);
HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
-
MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
-
MetadataFilter metadataFilter = new MetadataFilter(mapperConfig);
Mapper mapper = new Mapper(mappingTemplate);
XMLValidator validator = new XMLValidator(xmlSchema);
@@ -73,14 +74,18 @@ public class App {
.filter(validator::validate)
.map(mapper::map)
.subscribe(event -> logger.unwrap().info("Event Processed"));
-
- DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next);
- dataRouterSubscriber.start(mapperConfig);
+ DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
+ dataRouterSubscriber.start();
+ ArrayList<Configurable> configurables = new ArrayList<>();
+ configurables.add(dataRouterSubscriber);
+ DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(configurables, mapperConfig);
Undertow.builder()
.addHttpListener(8081, "0.0.0.0")
- .setHandler(Handlers.routing().add("put", "/delivery/{filename}", dataRouterSubscriber)
- .add("get", "/healthcheck", healthCheckHandler))
+ .setHandler(Handlers.routing()
+ .add("put", "/delivery/{filename}", dataRouterSubscriber)
+ .add("get", "/healthcheck", healthCheckHandler)
+ .add("get", "/reconfigure", dynamicConfiguration))
.build().start();
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java
new file mode 100644
index 0000000..ac2fe57
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java
@@ -0,0 +1,31 @@
+/*
+ * -
+ * * ============LICENSE_START=======================================================
+ * * Copyright (C) 2019 Nordix Foundation.
+ * * ================================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * *
+ * * SPDX-License-Identifier: Apache-2.0
+ * * ============LICENSE_END=========================================================
+ *
+ */
+
+package org.onap.dcaegen2.services.pmmapper.config;
+
+import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+
+public interface Configurable {
+
+ void reconfigure(MapperConfig mapperConfig) throws ReconfigurationException;
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java
new file mode 100644
index 0000000..37fa8b5
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java
@@ -0,0 +1,90 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.config;
+
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+import lombok.Data;
+import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+@Data
+public class DynamicConfiguration implements HttpHandler{
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DynamicConfiguration.class));
+ private List<Configurable> configurables;
+ private MapperConfig originalConfig;
+ private ConfigHandler configHandler;
+
+ /**
+ * Creates a Dynamic Configuration object with a list of configurable objects.
+ * @param configurables list of objects to reconfigure
+ * @param originalConfig original config to compare against.
+ */
+ public DynamicConfiguration(List<Configurable> configurables, MapperConfig originalConfig){
+ this.configurables = configurables;
+ this.originalConfig = originalConfig;
+ this.configHandler = new ConfigHandler();
+ }
+
+ private void applyConfiguration(MapperConfig updatedConfig) throws ReconfigurationException {
+ for (Configurable configurable : configurables) {
+ logger.unwrap().debug("Reconfiguring: {}", configurable);
+ configurable.reconfigure(updatedConfig);
+ }
+ }
+
+ /**
+ * Receives requests to pull the latest configuration from CBS.
+ * @param httpServerExchange inbound http server exchange.
+ * @throws Exception
+ */
+ @Override
+ public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
+ try {
+ logger.entering(new HttpServerExchangeAdapter(httpServerExchange));
+ MapperConfig config = configHandler.getMapperConfig();
+ int responseCode = StatusCodes.OK;
+ String responseMessage = StatusCodes.OK_STRING;
+
+ if (!this.originalConfig.equals(config)) {
+ logger.unwrap().info("Configuration update detected.");
+ logger.unwrap().info("Reconfiguring configurables");
+ try {
+ applyConfiguration(config);
+ this.originalConfig = config;
+ } catch (ReconfigurationException e) {
+ responseCode = StatusCodes.INTERNAL_SERVER_ERROR;
+ responseMessage = StatusCodes.INTERNAL_SERVER_ERROR_STRING;
+ applyConfiguration(this.originalConfig);
+ }
+ }
+ httpServerExchange.setStatusCode(responseCode).getResponseSender().send(responseMessage);
+ } finally {
+ logger.exiting();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
index 2f2ab4d..f37bcd3 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
@@ -24,11 +24,15 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
import io.undertow.util.HeaderValues;
import lombok.Data;
import lombok.NonNull;
+import org.onap.dcaegen2.services.pmmapper.config.Configurable;
import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException;
+import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
@@ -44,23 +48,28 @@ import org.onap.logging.ref.slf4j.ONAPLogConstants;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
+import java.util.stream.Collectors;
/**
* Subscriber for events sent from data router
* Provides an undertow HttpHandler to be used as an endpoint for data router to send events to.
*/
@Data
-public class DataRouterSubscriber implements HttpHandler {
+public class DataRouterSubscriber implements HttpHandler, Configurable {
public static final String METADATA_HEADER = "X-DMAAP-DR-META";
public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
@@ -75,40 +84,41 @@ public class DataRouterSubscriber implements HttpHandler {
private boolean limited = false;
private Random jitterGenerator;
private Gson metadataBuilder;
+ private MapperConfig config;
+ private String subscriberId;
@NonNull
private EventReceiver eventReceiver;
/**
* @param eventReceiver receiver for any inbound events.
*/
- public DataRouterSubscriber(EventReceiver eventReceiver) {
+ public DataRouterSubscriber(EventReceiver eventReceiver, MapperConfig config) {
this.eventReceiver = eventReceiver;
this.jitterGenerator = new Random();
this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>())
.create();
+ this.config = config;
+ this.subscriberId="";
}
/**
* Starts data flow by subscribing to data router through bus controller.
*
- * @param config configuration object containing bus controller endpoint for subscription and
- * all non constant configuration for subscription through this endpoint.
* @throws TooManyTriesException in the event that timeout has occurred several times.
*/
- public void start(MapperConfig config) throws TooManyTriesException, InterruptedException {
+ public void start() throws TooManyTriesException, InterruptedException {
try {
logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY);
- subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config);
+ subscribe();
logger.unwrap().info("Successfully started DR Subscriber");
} finally {
logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT);
}
}
- private HttpURLConnection getBusControllerConnection(MapperConfig config, int timeout) throws IOException {
- HttpURLConnection connection = (HttpURLConnection) config.getBusControllerSubscriptionUrl()
- .openConnection();
- connection.setRequestMethod("POST");
+ private HttpURLConnection getBusControllerConnection(String method, URL resource, int timeout) throws IOException {
+ HttpURLConnection connection = (HttpURLConnection) resource.openConnection();
+ connection.setRequestMethod(method);
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
connection.setRequestProperty("Content-Type", "application/json");
@@ -135,26 +145,66 @@ public class DataRouterSubscriber implements HttpHandler {
return subscriberObj;
}
- private void subscribe(int attempts, int timeout, MapperConfig config) throws TooManyTriesException, InterruptedException {
+ private void processResponse(HttpURLConnection connection) throws IOException {
+ try (BufferedReader responseBody = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
+ String body = responseBody.lines().collect(Collectors.joining(""));
+ updateSubscriberId(body);
+ } catch (IOException | JsonSyntaxException | IllegalStateException e) {
+ throw new IOException("Failed to process response", e);
+ }
+ }
+
+ private void updateSubscriberId(String responseBody) {
+ JsonParser parser = new JsonParser();
+ JsonObject responseObject = parser.parse(responseBody).getAsJsonObject();
+ this.subscriberId = responseObject.get("subId").getAsString();
+ }
+
+ private void subscribe() throws TooManyTriesException, InterruptedException {
+ try {
+ URL subscribeResource = this.config.getBusControllerSubscriptionUrl();
+ JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config);
+ request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "POST", subscribeResource, subscribeBody);
+ } catch (MalformedURLException e) {
+ throw new IllegalStateException("Subscription URL is malformed", e);
+ }
+
+ }
+ private void updateSubscriber() throws TooManyTriesException, InterruptedException {
+ try {
+ URL subscribeResource = this.config.getBusControllerSubscriptionUrl();
+ URL updateResource = new URL(String.format("%s/%s", subscribeResource, subscriberId));
+ JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config);
+ request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "PUT", updateResource, subscribeBody);
+ } catch (MalformedURLException e) {
+ throw new IllegalStateException("Subscription URL is malformed", e);
+ }
+ }
+
+ private void request(int attempts, int timeout, String method, URL resource, JsonObject subscribeBody) throws TooManyTriesException, InterruptedException {
int subResponse = 504;
String subMessage = "";
+ boolean processFailure = false;
try {
- HttpURLConnection connection = getBusControllerConnection(config, timeout);
-
+ HttpURLConnection connection = getBusControllerConnection(method, resource, timeout);
try (OutputStream bodyStream = connection.getOutputStream();
OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) {
- bodyWriter.write(getBusControllerSubscribeBody(config).toString());
+ bodyWriter.write(subscribeBody.toString());
}
subResponse = connection.getResponseCode();
subMessage = connection.getResponseMessage();
+ if (subResponse < 300) {
+ processResponse(connection);
+ }
} catch (IOException e) {
- logger.unwrap().error("Timeout Failure:", e);
+ logger.unwrap().error("Failure to process response", e);
+ processFailure = true;
}
logger.unwrap().info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage);
- if (subResponse >= 300 && attempts > 1) {
+ if ((subResponse >= 300 || processFailure) && attempts > 1 ) {
Thread.sleep(timeout);
- subscribe(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), config);
- } else if (subResponse >= 300) {
+ request(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), method, resource, subscribeBody);
+ } else if (subResponse >= 300 || processFailure) {
throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts");
}
}
@@ -209,4 +259,20 @@ public class DataRouterSubscriber implements HttpHandler {
logger.exiting();
}
}
+
+ @Override
+ public void reconfigure(MapperConfig config) throws ReconfigurationException {
+ logger.unwrap().info("Checking new Configuration against existing.");
+ if(!this.config.dmaapInfoEquals(config) || !this.config.getDmaapDRFeedId().equals(config.getDmaapDRFeedId())){
+ logger.unwrap().info("DMaaP Info changes found, reconfiguring.");
+ try {
+ this.config = config;
+ this.updateSubscriber();
+ } catch (TooManyTriesException | InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ReconfigurationException("Failed to reconfigure DataRouter subscriber.", e);
+ }
+ }
+
+ }
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java
new file mode 100644
index 0000000..66e3aee
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java
@@ -0,0 +1,30 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+public class ReconfigurationException extends Exception {
+ public ReconfigurationException(String message) {
+ super(message);
+ }
+ public ReconfigurationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
index 2f13080..d28d850 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
@@ -81,6 +81,15 @@ public class MapperConfig {
public String getSubscriberIdentity(){
return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
}
+
+ public boolean dmaapInfoEquals(MapperConfig mapperConfig){
+ return this
+ .getStreamsSubscribes()
+ .getDmaapSubscriber()
+ .getDmaapInfo()
+ .equals(mapperConfig.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo());
+ }
+
@Getter
@EqualsAndHashCode
private class StreamsSubscribes {