diff options
author | JoeOLeary <joseph.o.leary@est.tech> | 2019-03-13 16:47:20 +0000 |
---|---|---|
committer | JoeOLeary <joseph.o.leary@est.tech> | 2019-03-13 16:47:20 +0000 |
commit | 290aa864c6a7d02a4f310c5d1cf8106e02e9dd47 (patch) | |
tree | 96aa9cbe1b20fad87b24ff40bb3983323f59783e /src/main | |
parent | 88adbc830c24f309c19fc5874654cc1cfaebc600 (diff) |
Add Dynamic Configuration
*Implement Configurable in DataRouter subscriber
*Add reconfigure.sh
*Update Dockerfile to use reconfigure.sh
Issue-ID: DCAEGEN2-1038
Change-Id: I9ce387819afff644c3dc76a94786fa1f95afd985
Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
Diffstat (limited to 'src/main')
8 files changed, 264 insertions, 26 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 09d8975..6ebc61c 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -26,6 +26,8 @@ import io.undertow.util.StatusCodes; import lombok.NonNull; import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler; +import org.onap.dcaegen2.services.pmmapper.config.Configurable; +import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration; import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber; import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError; @@ -47,6 +49,7 @@ import reactor.core.scheduler.Schedulers; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; public class App { private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class)); @@ -57,9 +60,7 @@ public class App { public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException { Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink); HealthCheckHandler healthCheckHandler = new HealthCheckHandler(); - MapperConfig mapperConfig = new ConfigHandler().getMapperConfig(); - MetadataFilter metadataFilter = new MetadataFilter(mapperConfig); Mapper mapper = new Mapper(mappingTemplate); XMLValidator validator = new XMLValidator(xmlSchema); @@ -73,14 +74,18 @@ public class App { .filter(validator::validate) .map(mapper::map) .subscribe(event -> logger.unwrap().info("Event Processed")); - - DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next); - dataRouterSubscriber.start(mapperConfig); + DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig); + dataRouterSubscriber.start(); + ArrayList<Configurable> configurables = new ArrayList<>(); + configurables.add(dataRouterSubscriber); + DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(configurables, mapperConfig); Undertow.builder() .addHttpListener(8081, "0.0.0.0") - .setHandler(Handlers.routing().add("put", "/delivery/{filename}", dataRouterSubscriber) - .add("get", "/healthcheck", healthCheckHandler)) + .setHandler(Handlers.routing() + .add("put", "/delivery/{filename}", dataRouterSubscriber) + .add("get", "/healthcheck", healthCheckHandler) + .add("get", "/reconfigure", dynamicConfiguration)) .build().start(); } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java new file mode 100644 index 0000000..ac2fe57 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/Configurable.java @@ -0,0 +1,31 @@ +/* + * - + * * ============LICENSE_START======================================================= + * * Copyright (C) 2019 Nordix Foundation. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * * SPDX-License-Identifier: Apache-2.0 + * * ============LICENSE_END========================================================= + * + */ + +package org.onap.dcaegen2.services.pmmapper.config; + +import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; + +public interface Configurable { + + void reconfigure(MapperConfig mapperConfig) throws ReconfigurationException; +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java new file mode 100644 index 0000000..37fa8b5 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfiguration.java @@ -0,0 +1,90 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.config; + +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.StatusCodes; +import lombok.Data; +import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter; +import org.onap.logging.ref.slf4j.ONAPLogAdapter; +import org.slf4j.LoggerFactory; + +import java.util.List; + +@Data +public class DynamicConfiguration implements HttpHandler{ + private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DynamicConfiguration.class)); + private List<Configurable> configurables; + private MapperConfig originalConfig; + private ConfigHandler configHandler; + + /** + * Creates a Dynamic Configuration object with a list of configurable objects. + * @param configurables list of objects to reconfigure + * @param originalConfig original config to compare against. + */ + public DynamicConfiguration(List<Configurable> configurables, MapperConfig originalConfig){ + this.configurables = configurables; + this.originalConfig = originalConfig; + this.configHandler = new ConfigHandler(); + } + + private void applyConfiguration(MapperConfig updatedConfig) throws ReconfigurationException { + for (Configurable configurable : configurables) { + logger.unwrap().debug("Reconfiguring: {}", configurable); + configurable.reconfigure(updatedConfig); + } + } + + /** + * Receives requests to pull the latest configuration from CBS. + * @param httpServerExchange inbound http server exchange. + * @throws Exception + */ + @Override + public void handleRequest(HttpServerExchange httpServerExchange) throws Exception { + try { + logger.entering(new HttpServerExchangeAdapter(httpServerExchange)); + MapperConfig config = configHandler.getMapperConfig(); + int responseCode = StatusCodes.OK; + String responseMessage = StatusCodes.OK_STRING; + + if (!this.originalConfig.equals(config)) { + logger.unwrap().info("Configuration update detected."); + logger.unwrap().info("Reconfiguring configurables"); + try { + applyConfiguration(config); + this.originalConfig = config; + } catch (ReconfigurationException e) { + responseCode = StatusCodes.INTERNAL_SERVER_ERROR; + responseMessage = StatusCodes.INTERNAL_SERVER_ERROR_STRING; + applyConfiguration(this.originalConfig); + } + } + httpServerExchange.setStatusCode(responseCode).getResponseSender().send(responseMessage); + } finally { + logger.exiting(); + } + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java index 2f2ab4d..f37bcd3 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java @@ -24,11 +24,15 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; import io.undertow.util.HeaderValues; import lombok.Data; import lombok.NonNull; +import org.onap.dcaegen2.services.pmmapper.config.Configurable; import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException; +import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException; import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; import org.onap.dcaegen2.services.pmmapper.model.EventMetadata; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; @@ -44,23 +48,28 @@ import org.onap.logging.ref.slf4j.ONAPLogConstants; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.UUID; +import java.util.stream.Collectors; /** * Subscriber for events sent from data router * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to. */ @Data -public class DataRouterSubscriber implements HttpHandler { +public class DataRouterSubscriber implements HttpHandler, Configurable { public static final String METADATA_HEADER = "X-DMAAP-DR-META"; public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID"; @@ -75,40 +84,41 @@ public class DataRouterSubscriber implements HttpHandler { private boolean limited = false; private Random jitterGenerator; private Gson metadataBuilder; + private MapperConfig config; + private String subscriberId; @NonNull private EventReceiver eventReceiver; /** * @param eventReceiver receiver for any inbound events. */ - public DataRouterSubscriber(EventReceiver eventReceiver) { + public DataRouterSubscriber(EventReceiver eventReceiver, MapperConfig config) { this.eventReceiver = eventReceiver; this.jitterGenerator = new Random(); this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>()) .create(); + this.config = config; + this.subscriberId=""; } /** * Starts data flow by subscribing to data router through bus controller. * - * @param config configuration object containing bus controller endpoint for subscription and - * all non constant configuration for subscription through this endpoint. * @throws TooManyTriesException in the event that timeout has occurred several times. */ - public void start(MapperConfig config) throws TooManyTriesException, InterruptedException { + public void start() throws TooManyTriesException, InterruptedException { try { logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY); - subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config); + subscribe(); logger.unwrap().info("Successfully started DR Subscriber"); } finally { logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT); } } - private HttpURLConnection getBusControllerConnection(MapperConfig config, int timeout) throws IOException { - HttpURLConnection connection = (HttpURLConnection) config.getBusControllerSubscriptionUrl() - .openConnection(); - connection.setRequestMethod("POST"); + private HttpURLConnection getBusControllerConnection(String method, URL resource, int timeout) throws IOException { + HttpURLConnection connection = (HttpURLConnection) resource.openConnection(); + connection.setRequestMethod(method); connection.setConnectTimeout(timeout); connection.setReadTimeout(timeout); connection.setRequestProperty("Content-Type", "application/json"); @@ -135,26 +145,66 @@ public class DataRouterSubscriber implements HttpHandler { return subscriberObj; } - private void subscribe(int attempts, int timeout, MapperConfig config) throws TooManyTriesException, InterruptedException { + private void processResponse(HttpURLConnection connection) throws IOException { + try (BufferedReader responseBody = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + String body = responseBody.lines().collect(Collectors.joining("")); + updateSubscriberId(body); + } catch (IOException | JsonSyntaxException | IllegalStateException e) { + throw new IOException("Failed to process response", e); + } + } + + private void updateSubscriberId(String responseBody) { + JsonParser parser = new JsonParser(); + JsonObject responseObject = parser.parse(responseBody).getAsJsonObject(); + this.subscriberId = responseObject.get("subId").getAsString(); + } + + private void subscribe() throws TooManyTriesException, InterruptedException { + try { + URL subscribeResource = this.config.getBusControllerSubscriptionUrl(); + JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config); + request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "POST", subscribeResource, subscribeBody); + } catch (MalformedURLException e) { + throw new IllegalStateException("Subscription URL is malformed", e); + } + + } + private void updateSubscriber() throws TooManyTriesException, InterruptedException { + try { + URL subscribeResource = this.config.getBusControllerSubscriptionUrl(); + URL updateResource = new URL(String.format("%s/%s", subscribeResource, subscriberId)); + JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config); + request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "PUT", updateResource, subscribeBody); + } catch (MalformedURLException e) { + throw new IllegalStateException("Subscription URL is malformed", e); + } + } + + private void request(int attempts, int timeout, String method, URL resource, JsonObject subscribeBody) throws TooManyTriesException, InterruptedException { int subResponse = 504; String subMessage = ""; + boolean processFailure = false; try { - HttpURLConnection connection = getBusControllerConnection(config, timeout); - + HttpURLConnection connection = getBusControllerConnection(method, resource, timeout); try (OutputStream bodyStream = connection.getOutputStream(); OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) { - bodyWriter.write(getBusControllerSubscribeBody(config).toString()); + bodyWriter.write(subscribeBody.toString()); } subResponse = connection.getResponseCode(); subMessage = connection.getResponseMessage(); + if (subResponse < 300) { + processResponse(connection); + } } catch (IOException e) { - logger.unwrap().error("Timeout Failure:", e); + logger.unwrap().error("Failure to process response", e); + processFailure = true; } logger.unwrap().info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage); - if (subResponse >= 300 && attempts > 1) { + if ((subResponse >= 300 || processFailure) && attempts > 1 ) { Thread.sleep(timeout); - subscribe(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), config); - } else if (subResponse >= 300) { + request(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), method, resource, subscribeBody); + } else if (subResponse >= 300 || processFailure) { throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts"); } } @@ -209,4 +259,20 @@ public class DataRouterSubscriber implements HttpHandler { logger.exiting(); } } + + @Override + public void reconfigure(MapperConfig config) throws ReconfigurationException { + logger.unwrap().info("Checking new Configuration against existing."); + if(!this.config.dmaapInfoEquals(config) || !this.config.getDmaapDRFeedId().equals(config.getDmaapDRFeedId())){ + logger.unwrap().info("DMaaP Info changes found, reconfiguring."); + try { + this.config = config; + this.updateSubscriber(); + } catch (TooManyTriesException | InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ReconfigurationException("Failed to reconfigure DataRouter subscriber.", e); + } + } + + } } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java new file mode 100644 index 0000000..66e3aee --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ReconfigurationException.java @@ -0,0 +1,30 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.exceptions; + +public class ReconfigurationException extends Exception { + public ReconfigurationException(String message) { + super(message); + } + public ReconfigurationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java index 2f13080..d28d850 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java @@ -81,6 +81,15 @@ public class MapperConfig { public String getSubscriberIdentity(){
return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
}
+
+ public boolean dmaapInfoEquals(MapperConfig mapperConfig){
+ return this
+ .getStreamsSubscribes()
+ .getDmaapSubscriber()
+ .getDmaapInfo()
+ .equals(mapperConfig.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo());
+ }
+
@Getter
@EqualsAndHashCode
private class StreamsSubscribes {
diff --git a/src/main/resources/Dockerfile b/src/main/resources/Dockerfile index e2b6b89..2f4505c 100644 --- a/src/main/resources/Dockerfile +++ b/src/main/resources/Dockerfile @@ -19,7 +19,7 @@ # FROM openjdk:8-jre-alpine -ENTRYPOINT ["/usr/bin/java", "-jar", "/opt/app/pm-mapper/pm-mapper.jar"] +ENTRYPOINT ["/bin/sh", "-c", "nohup sh etc/reconfigure.sh & /usr/bin/java -jar /opt/app/pm-mapper/pm-mapper.jar"] ARG JAR WORKDIR /opt/app/pm-mapper @@ -32,5 +32,6 @@ RUN addgroup -S onap \ ADD target/${JAR} /opt/app/pm-mapper/pm-mapper.jar ADD target/classes/mapping.ftl /opt/app/pm-mapper/etc/mapping.ftl ADD target/classes/measCollec_plusString.xsd /opt/app/pm-mapper/etc/measCollec_plusString.xsd +ADD target/classes/reconfigure.sh /opt/app/pm-mapper/etc/reconfigure.sh USER pm-mapper diff --git a/src/main/resources/reconfigure.sh b/src/main/resources/reconfigure.sh new file mode 100644 index 0000000..ac6f940 --- /dev/null +++ b/src/main/resources/reconfigure.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env sh +while true +do + sleep 60 + echo $(wget -S --spider localhost:8081/reconfigure 2>&1) >> /var/log/ONAP/dcaegen2/services/pm-mapper/reconfigure.log +done |