summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java50
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/config/ConfigHandler.java6
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java278
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandler.java119
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java7
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java3
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java38
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java4
-rw-r--r--src/main/resources/logback.xml2
9 files changed, 162 insertions, 345 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 25e3918..a5eb68d 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -28,13 +28,11 @@ import lombok.NonNull;
import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
import org.onap.dcaegen2.services.pmmapper.config.Configurable;
import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration;
-import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
-import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException;
+import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler;
import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
@@ -67,7 +65,7 @@ public class App {
private static Path xmlSchema = Paths.get("/opt/app/pm-mapper/etc/measCollec_plusString.xsd");
private static FluxSink<Event> fluxSink;
- public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException, IOException {
+ public static void main(String[] args) throws EnvironmentConfigException, CBSServerError, MapperConfigException, IOException {
Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink);
HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
@@ -86,18 +84,16 @@ public class App {
.runOn(Schedulers.newParallel(""), 1)
.doOnNext(event -> MDC.setContextMap(event.getMdc()))
.filter(metadataFilter::filter)
- .filter(filterHandler::filterByFileType)
- .filter(validator::validate)
+ .filter(event -> App.filterByFileType(filterHandler, event, mapperConfig))
+ .filter(event -> App.validate(validator, event, mapperConfig))
.concatMap(event -> App.split(splitter,event, mapperConfig))
.filter(events -> App.filter(filterHandler, events, mapperConfig))
.concatMap(events -> App.map(mapper, events, mapperConfig))
.concatMap(vesPublisher::publish)
.subscribe(event -> App.sendEventProcessed(mapperConfig, event));
- DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
- dataRouterSubscriber.start();
+ DeliveryHandler deliveryHandler = new DeliveryHandler(fluxSink::next);
ArrayList<Configurable> configurables = new ArrayList<>();
- configurables.add(dataRouterSubscriber);
configurables.add(mapperConfig);
DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(configurables, mapperConfig);
@@ -113,12 +109,40 @@ public class App {
builder.addHttpsListener(8443, "0.0.0.0", sslContext)
.setHandler(Handlers.routing()
- .add("put", "/delivery/{filename}", dataRouterSubscriber)
+ .add("put", "/delivery/{filename}", deliveryHandler)
.add("get", "/healthcheck", healthCheckHandler)
.add("get", "/reconfigure", dynamicConfiguration))
.build().start();
}
+ public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) {
+ boolean hasValidFileName = false;
+ try {
+ hasValidFileName = filterHandler.filterByFileType(event);
+ if(!hasValidFileName) {
+ sendEventProcessed(config,event);
+ }
+ } catch (Exception exception) {
+ logger.unwrap().error("Unable to filter by file type", exception);
+ sendEventProcessed(config,event);
+ }
+ return hasValidFileName;
+ }
+
+ public static boolean validate(XMLValidator validator, Event event, MapperConfig config) {
+ boolean isValidXML = false;
+ try {
+ isValidXML = validator.validate(event);
+ if(!isValidXML) {
+ sendEventProcessed(config,event);
+ }
+ } catch (Exception exception) {
+ logger.unwrap().error("Unable to validate XML",exception);
+ sendEventProcessed(config,event);
+ }
+ return isValidXML;
+ }
+
public static boolean filter(MeasFilterHandler filterHandler, List<Event> events, MapperConfig config) {
Event event = events.get(0);
boolean hasMatchingFilter = false;
@@ -128,7 +152,7 @@ public class App {
sendEventProcessed(config,event);
}
} catch (Exception exception) {
- logger.unwrap().error(exception.getMessage(),exception);
+ logger.unwrap().error("Unable to filter by Meas Types",exception);
sendEventProcessed(config,event);
}
return hasMatchingFilter;
@@ -139,7 +163,7 @@ public class App {
try {
mappedEvents = mapper.mapEvents(events);
} catch (Exception exception) {
- logger.unwrap().error(exception.getMessage(),exception);
+ logger.unwrap().error("Unable to map XML to VES",exception);
sendEventProcessed(config,events.get(0));
return Flux.<List<Event>>empty();
}
@@ -151,7 +175,7 @@ public class App {
try {
splitEvents = splitter.split(event);
} catch (Exception exception) {
- logger.unwrap().error(exception.getMessage(),exception);
+ logger.unwrap().error("Unable to split MeasCollecFile",exception);
sendEventProcessed(config,event);
return Flux.<List<Event>>empty();
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/ConfigHandler.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/ConfigHandler.java
index fe2f247..e50ec6c 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/ConfigHandler.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/ConfigHandler.java
@@ -59,14 +59,14 @@ public class ConfigHandler {
/**
* Retrieves PM-Mapper Configuration from DCAE's ConfigBinding Service.
- *
+ *
* @throws EnvironmentConfigException
* @throws ConsulServerError
* @throws CBSConfigException
* @throws CBSServerError
* @throws MapperConfigException
- */
- public MapperConfig getMapperConfig() throws CBSConfigException, EnvironmentConfigException,
+ */
+ public MapperConfig getMapperConfig() throws EnvironmentConfigException,
CBSServerError, MapperConfigException {
String mapperConfigJson = "";
String cbsSocketAddress = EnvironmentConfig.getCBSHostName() + ":" + EnvironmentConfig.getCBSPort();
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
deleted file mode 100644
index a0a8eaf..0000000
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.pmmapper.datarouter;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import io.undertow.util.HeaderValues;
-import lombok.Data;
-import lombok.NonNull;
-
-import org.onap.dcaegen2.services.pmmapper.config.Configurable;
-import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
-import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
-import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
-import org.onap.dcaegen2.services.pmmapper.model.Event;
-import io.undertow.server.HttpHandler;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.StatusCodes;
-
-import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
-import org.onap.dcaegen2.services.pmmapper.utils.RequiredFieldDeserializer;
-import org.onap.logging.ref.slf4j.ONAPLogAdapter;
-import org.onap.logging.ref.slf4j.ONAPLogConstants;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-/**
- * Subscriber for events sent from data router
- * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to.
- */
-@Data
-public class DataRouterSubscriber implements HttpHandler, Configurable {
- public static final String METADATA_HEADER = "X-DMAAP-DR-META";
- public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
-
- private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterSubscriber.class));
- private static final int NUMBER_OF_ATTEMPTS = 5;
- private static final int DEFAULT_TIMEOUT = 2000;
- private static final int MAX_JITTER = 50;
-
- private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
- private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
-
- private boolean limited = false;
- private Random jitterGenerator;
- private Gson metadataBuilder;
- private MapperConfig config;
- public static String subscriberId;
- @NonNull
- private EventReceiver eventReceiver;
-
- /**
- * @param eventReceiver receiver for any inbound events.
- */
- public DataRouterSubscriber(EventReceiver eventReceiver, MapperConfig config) {
- this.eventReceiver = eventReceiver;
- this.jitterGenerator = new Random();
- this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>())
- .create();
- this.config = config;
- this.subscriberId="";
- }
-
- /**
- * Starts data flow by subscribing to data router through bus controller.
- *
- * @throws TooManyTriesException in the event that timeout has occurred several times.
- */
- public void start() throws TooManyTriesException, InterruptedException {
- try {
- logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY);
- subscribe();
- logger.unwrap().info("Successfully started DR Subscriber");
- } finally {
- logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT);
- }
- }
-
- private HttpURLConnection getBusControllerConnection(String method, URL resource, int timeout) throws IOException {
- HttpURLConnection connection = (HttpURLConnection) resource.openConnection();
- connection.setRequestMethod(method);
- connection.setConnectTimeout(timeout);
- connection.setReadTimeout(timeout);
- connection.setRequestProperty("Content-Type", "application/json");
- connection.setDoOutput(true);
-
- final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);
- final UUID requestID = UUID.randomUUID();
- connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());
- connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());
- connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
-
- return connection;
- }
-
- private JsonObject getBusControllerSubscribeBody(MapperConfig config) {
- JsonObject subscriberObj = new JsonObject();
- subscriberObj.addProperty("dcaeLocationName", config.getSubscriberDcaeLocation());
- subscriberObj.addProperty("deliveryURL", config.getBusControllerDeliveryUrl());
- subscriberObj.addProperty("feedId", config.getDmaapDRFeedId());
- subscriberObj.addProperty("lastMod", Instant.now().toString());
- subscriberObj.addProperty("username", config.getBusControllerUserName());
- subscriberObj.addProperty("userpwd", config.getBusControllerPassword());
- subscriberObj.addProperty("privilegedSubscriber", true);
- return subscriberObj;
- }
-
- private void processResponse(HttpURLConnection connection) throws IOException {
- try (BufferedReader responseBody = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
- String body = responseBody.lines().collect(Collectors.joining(""));
- updateSubscriberId(body);
- } catch (IOException | JsonSyntaxException | IllegalStateException e) {
- throw new IOException("Failed to process response", e);
- }
- }
-
- private void updateSubscriberId(String responseBody) {
- JsonParser parser = new JsonParser();
- JsonObject responseObject = parser.parse(responseBody).getAsJsonObject();
- this.subscriberId = responseObject.get("subId").getAsString();
- }
-
- private void subscribe() throws TooManyTriesException, InterruptedException {
- try {
- URL subscribeResource = this.config.getBusControllerSubscriptionUrl();
- JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config);
- request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "POST", subscribeResource, subscribeBody);
- } catch (MalformedURLException e) {
- throw new IllegalStateException("Subscription URL is malformed", e);
- }
-
- }
- private void updateSubscriber() throws TooManyTriesException, InterruptedException {
- try {
- URL subscribeResource = this.config.getBusControllerSubscriptionUrl();
- URL updateResource = new URL(String.format("%s/%s", subscribeResource, subscriberId));
- JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config);
- request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "PUT", updateResource, subscribeBody);
- } catch (MalformedURLException e) {
- throw new IllegalStateException("Subscription URL is malformed", e);
- }
- }
-
- private void request(int attempts, int timeout, String method, URL resource, JsonObject subscribeBody) throws TooManyTriesException, InterruptedException {
- int subResponse = 504;
- String subMessage = "";
- boolean processFailure = false;
- try {
- HttpURLConnection connection = getBusControllerConnection(method, resource, timeout);
- try (OutputStream bodyStream = connection.getOutputStream();
- OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) {
- bodyWriter.write(subscribeBody.toString());
- }
- subResponse = connection.getResponseCode();
- subMessage = connection.getResponseMessage();
- if (subResponse < 300) {
- processResponse(connection);
- }
- } catch (IOException e) {
- logger.unwrap().error("Failure to process response", e);
- processFailure = true;
- }
- logger.unwrap().info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage);
- if ((subResponse >= 300 || processFailure) && attempts > 1 ) {
- Thread.sleep(timeout);
- request(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), method, resource, subscribeBody);
- } else if (subResponse >= 300 || processFailure) {
- throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts");
- }
- }
-
- private EventMetadata getMetadata(HttpServerExchange httpServerExchange) throws NoMetadataException {
- String metadata = Optional.ofNullable(httpServerExchange.getRequestHeaders()
- .get(METADATA_HEADER))
- .map((HeaderValues headerValues) -> headerValues.get(0))
- .orElseThrow(() -> new NoMetadataException("Metadata Not found"));
- return metadataBuilder.fromJson(metadata, EventMetadata.class);
- }
-
- /**
- * Receives inbound requests, verifies that required headers are valid
- * and passes an Event onto the eventReceiver.
- * The forwarded httpServerExchange response is the responsibility of the eventReceiver.
- *
- * @param httpServerExchange inbound http server exchange.
- */
- @Override
- public void handleRequest(HttpServerExchange httpServerExchange) {
- try{
- logger.entering(new HttpServerExchangeAdapter(httpServerExchange));
- if (limited) {
- httpServerExchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE)
- .getResponseSender()
- .send(StatusCodes.SERVICE_UNAVAILABLE_STRING);
- } else {
- try {
-
- Map<String,String> mdc = MDC.getCopyOfContextMap();
- EventMetadata metadata = getMetadata(httpServerExchange);
- String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst();
- httpServerExchange.getRequestReceiver()
- .receiveFullString((callbackExchange, body) ->
- httpServerExchange.dispatch(() ->
- eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc, publishIdentity)))
- );
- } catch (NoMetadataException exception) {
- logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
- httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
- .getResponseSender()
- .send(NO_METADATA_MESSAGE);
- } catch (JsonParseException exception) {
- logger.unwrap().info("Bad Request: Failure to parse metadata", exception);
- httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
- .getResponseSender()
- .send(BAD_METADATA_MESSAGE);
- }
- }
- } finally {
- logger.exiting();
- }
- }
-
- @Override
- public void reconfigure(MapperConfig config) throws ReconfigurationException {
- logger.unwrap().info("Checking new Configuration against existing.");
- if(!this.config.dmaapInfoEquals(config) || !this.config.getDmaapDRFeedId().equals(config.getDmaapDRFeedId())){
- logger.unwrap().info("DMaaP Info changes found, reconfiguring.");
- try {
- this.config = config;
- this.updateSubscriber();
- } catch (TooManyTriesException | InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ReconfigurationException("Failed to reconfigure DataRouter subscriber.", e);
- }
- }
-
- }
-}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandler.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandler.java
new file mode 100644
index 0000000..4d6af29
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandler.java
@@ -0,0 +1,119 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParseException;
+import io.undertow.util.HeaderValues;
+import lombok.Data;
+import lombok.NonNull;
+
+import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+
+import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
+import org.onap.dcaegen2.services.pmmapper.utils.RequiredFieldDeserializer;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to.
+ */
+@Data
+public class DeliveryHandler implements HttpHandler {
+
+ public static final String METADATA_HEADER = "X-DMAAP-DR-META";
+ public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
+
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DeliveryHandler.class));
+
+ private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
+ private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
+
+ private Gson metadataBuilder;
+
+ @NonNull
+ private EventReceiver eventReceiver;
+
+ /**
+ * @param eventReceiver receiver for any inbound events.
+ */
+ public DeliveryHandler(EventReceiver eventReceiver) {
+ this.eventReceiver = eventReceiver;
+ this.metadataBuilder = new GsonBuilder()
+ .registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>())
+ .create();
+ }
+
+ private EventMetadata getMetadata(HttpServerExchange httpServerExchange) throws NoMetadataException {
+ String metadata = Optional.ofNullable(httpServerExchange.getRequestHeaders()
+ .get(METADATA_HEADER))
+ .map((HeaderValues headerValues) -> headerValues.get(0))
+ .orElseThrow(() -> new NoMetadataException("Metadata Not found"));
+ return metadataBuilder.fromJson(metadata, EventMetadata.class);
+ }
+
+ /**
+ * Receives inbound requests, verifies that required headers are valid
+ * and passes an Event onto the eventReceiver.
+ * The forwarded httpServerExchange response is the responsibility of the eventReceiver.
+ *
+ * @param httpServerExchange inbound http server exchange.
+ */
+ @Override
+ public void handleRequest(HttpServerExchange httpServerExchange) {
+ try{
+ logger.entering(new HttpServerExchangeAdapter(httpServerExchange));
+ try {
+ Map<String,String> mdc = MDC.getCopyOfContextMap();
+ EventMetadata metadata = getMetadata(httpServerExchange);
+ String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst();
+ httpServerExchange.getRequestReceiver()
+ .receiveFullString((callbackExchange, body) ->
+ httpServerExchange.dispatch(() ->
+ eventReceiver.receive(new Event(
+ callbackExchange, body, metadata, mdc, publishIdentity)))
+ );
+ } catch (NoMetadataException exception) {
+ logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
+ httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+ .getResponseSender()
+ .send(NO_METADATA_MESSAGE);
+ } catch (JsonParseException exception) {
+ logger.unwrap().info("Bad Request: Failure to parse metadata", exception);
+ httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+ .getResponseSender()
+ .send(BAD_METADATA_MESSAGE);
+ }
+ } finally {
+ logger.exiting();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
index 20c8a64..1fb6019 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
@@ -22,7 +22,6 @@ package org.onap.dcaegen2.services.pmmapper.filtering;
import lombok.NonNull;
import org.onap.dcaegen2.services.pmmapper.exceptions.*;
-import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
@@ -48,7 +47,6 @@ public class MetadataFilter {
* @param event inbound event
*/
public boolean filter(@NonNull Event event) {
- String decompressionStatus;
logger.unwrap().info("Filtering event metadata");
EventMetadata metadata = event.getMetadata();
@@ -56,11 +54,6 @@ public class MetadataFilter {
List<MeasFilterConfig.Filter> filters = measFilterConfig.getFilters();
- if(metadata.getDecompressionStatus() != null) {
- decompressionStatus = metadata.getDecompressionStatus();
- logger.unwrap().debug("Decompression Status: {}", decompressionStatus);
- }
-
if(filters.isEmpty()) {
logger.unwrap().info("No filter specified in config: {}", filters);
return true;
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
index 8a0977d..601b00f 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
@@ -19,7 +19,6 @@
*/
package org.onap.dcaegen2.services.pmmapper.model;
-import com.google.gson.annotations.SerializedName;
import lombok.Data;
import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
@@ -48,6 +47,4 @@ public class EventMetadata {
private String fileFormatType;
@GSONRequired
private String fileFormatVersion;
- @SerializedName("decompression_status")
- private String decompressionStatus;
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
index b9d58ee..390fa0d 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
@@ -19,9 +19,6 @@
*/
package org.onap.dcaegen2.services.pmmapper.model;
-import java.net.MalformedURLException;
-import java.net.URL;
-
import org.onap.dcaegen2.services.pmmapper.config.Configurable;
import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
import com.google.gson.annotations.SerializedName;
@@ -68,14 +65,6 @@ public class MapperConfig implements Configurable{
private StreamsPublishes streamsPublishes;
@GSONRequired
- @SerializedName("buscontroller_feed_subscription_endpoint")
- private String busControllerSubscriptionEndpoint;
-
- @GSONRequired
- @SerializedName("dmaap_dr_feed_id")
- private String dmaapDRFeedId;
-
- @GSONRequired
@SerializedName("dmaap_dr_delete_endpoint")
private String dmaapDRDeleteEndpoint;
@@ -83,34 +72,10 @@ public class MapperConfig implements Configurable{
@SerializedName("pm-mapper-filter")
private MeasFilterConfig filterConfig;
- public String getBusControllerDeliveryUrl() {
- return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getDeliveryUrl();
- }
-
- public String getDcaeLocation() {
- return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation();
- }
-
- public String getBusControllerUserName() {
- return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getUsername();
- }
-
- public String getBusControllerPassword() {
- return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getPassword();
- }
-
- public URL getBusControllerSubscriptionUrl() throws MalformedURLException {
- return new URL(this.getBusControllerSubscriptionEndpoint());
- }
-
public String getSubscriberIdentity(){
return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
}
- public String getSubscriberDcaeLocation() {
- return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation();
- }
-
public String getPublisherTopicUrl() {
return this.getStreamsPublishes().getDmaapPublisher().getDmaapInfo().getTopicUrl();
}
@@ -187,10 +152,9 @@ public class MapperConfig implements Configurable{
@Override
public void reconfigure(MapperConfig mapperConfig) {
if(!this.equals(mapperConfig)) {
+ this.filterConfig = mapperConfig.getFilterConfig();
this.streamsSubscribes = mapperConfig.getStreamsSubscribes();
this.streamsPublishes = mapperConfig.getStreamsPublishes();
- this.busControllerSubscriptionEndpoint = mapperConfig.getBusControllerSubscriptionEndpoint();
- this.dmaapDRFeedId = mapperConfig.getDmaapDRFeedId();
this.dmaapDRDeleteEndpoint = mapperConfig.getDmaapDRDeleteEndpoint();
}
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
index 5147863..23e8d71 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
@@ -20,7 +20,6 @@
package org.onap.dcaegen2.services.pmmapper.utils;
-import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
@@ -42,12 +41,11 @@ public class DataRouterUtils {
public static String processEvent(MapperConfig config, Event event){
logger.unwrap().info("Sending processed to DataRouter");
String baseDelete = config.getDmaapDRDeleteEndpoint();
- String subscriberIdentity = DataRouterSubscriber.subscriberId;
+ String subscriberIdentity = config.getSubscriberIdentity();
String delete = String.format("%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity());
try {
return new RequestSender().send("DELETE", delete);
} catch (Exception exception) {
- logger.unwrap().error("Process event failure", exception);
throw new ProcessEventException("Process event failure", exception);
}
}
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index 0d5d83c..dff2f8b 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -15,7 +15,7 @@
<property name="p_thr" value="%thread"/>
<property name="pattern" value="%nopexception${p_tim}\t${p_thr}\t${p_lvl}\t${p_log}\t${p_mdc}\t${p_msg}\t${p_exc}\t${p_mak}\t%n"/>
- <variable name="logLevel" value="${LOG_LEVEL:-INFO}"/>
+ <variable name="logLevel" value="${LOG_LEVEL:-DEBUG}"/>
<logger name="org.mockserver" level="${mockserver.logLevel:-OFF}"/>