summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJoeOLeary <joseph.o.leary@est.tech>2019-01-22 16:09:34 +0000
committerJoeOLeary <joseph.o.leary@est.tech>2019-01-22 16:09:34 +0000
commit8338d2e36a8aba7907a78025b8381d2cb05eff06 (patch)
tree6e0cc04a89ab8aad68c9c0ddd57a0d9b62ebec64 /src
parent7e0818cfb729efa7617893b8cb6f9afb0faba83f (diff)
Add DataRouter Subscriber
Change-Id: I7f81c3d7249dcfcf00e2c7f3272f478d2346397d Issue-ID: DCAEGEN2-1079 Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java50
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java40
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java177
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java31
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java27
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java29
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java37
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java50
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java33
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java57
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java225
-rw-r--r--src/test/resources/invalid_metadata.json11
-rw-r--r--src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker1
-rw-r--r--src/test/resources/valid_metadata.json12
14 files changed, 780 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
new file mode 100644
index 0000000..2b93d03
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -0,0 +1,50 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper;
+
+import io.undertow.Handlers;
+import io.undertow.Undertow;
+import io.undertow.util.StatusCodes;
+import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig;
+import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
+import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class App {
+
+ public static void main(String[] args) throws MalformedURLException, InterruptedException, TooManyTriesException {
+ DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(event -> {
+ event.getHttpServerExchange().unDispatch();
+ event.getHttpServerExchange().getResponseSender().send(StatusCodes.OK_STRING);
+ System.out.println(event.getMetadata().getProductName());
+ });
+ BusControllerConfig config = new BusControllerConfig();
+ config.setDataRouterSubscribeEndpoint(new URL("http://" + System.getenv("DMAAP_BC_SERVICE_HOST") + ":" + System.getenv("DMAAP_BC_SERVICE_PORT") + "/webapi/dr_subs"));
+ dataRouterSubscriber.start(config);
+
+ Undertow.builder()
+ .addHttpListener(8081, "0.0.0.0")
+ .setHandler(Handlers.routing().add("put", "/sub", dataRouterSubscriber))
+ .build().start();
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java
new file mode 100644
index 0000000..63b2a32
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.config;
+
+import lombok.Data;
+
+import java.net.URL;
+
+/**
+ * Stub for BusControllerConfiguration object.
+ */
+@Data
+public class BusControllerConfig {
+
+ private String dcaeLocation = "dcaeLocation";
+ private String deliveryURL = "deliveryURL";
+ private int feedId = 2;
+ private String lastMod = "lastMod";
+ private String username = "username";
+ private String password = "password";
+ private URL dataRouterSubscribeEndpoint;
+
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
new file mode 100644
index 0000000..1d27d3b
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
@@ -0,0 +1,177 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import io.undertow.util.HeaderValues;
+import lombok.Data;
+import lombok.NonNull;
+import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig;
+import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException;
+import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+
+import lombok.extern.slf4j.Slf4j;
+import org.onap.dcaegen2.services.pmmapper.utils.RequiredFieldDeserializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.Random;
+
+/**
+ * Subscriber for events sent from data router
+ * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to.
+ */
+@Slf4j
+@Data
+public class DataRouterSubscriber implements HttpHandler {
+
+ private static final int NUMBER_OF_ATTEMPTS = 5;
+ private static final int DEFAULT_TIMEOUT = 2000;
+ private static final int MAX_JITTER = 50;
+
+ private static final String METADATA_HEADER = "X-ATT-DR-META";
+ private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
+ private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
+
+ private boolean limited = false;
+ private Random jitterGenerator;
+ private Gson metadataBuilder;
+ @NonNull
+ private EventReceiver eventReceiver;
+
+ /**
+ * @param eventReceiver receiver for any inbound events.
+ */
+ public DataRouterSubscriber(EventReceiver eventReceiver) {
+ this.eventReceiver = eventReceiver;
+ this.jitterGenerator = new Random();
+ this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>())
+ .create();
+ }
+
+ /**
+ * Starts data flow by subscribing to data router through bus controller.
+ *
+ * @param config configuration object containing bus controller endpoint for subscription and
+ * all non constant configuration for subscription through this endpoint.
+ * @throws TooManyTriesException in the event that timeout has occurred several times.
+ */
+ public void start(BusControllerConfig config) throws TooManyTriesException, InterruptedException {
+ subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config);
+ }
+
+ private HttpURLConnection getBusControllerConnection(BusControllerConfig config, int timeout) throws IOException {
+ HttpURLConnection connection = (HttpURLConnection) config.getDataRouterSubscribeEndpoint()
+ .openConnection();
+ connection.setRequestMethod("POST");
+ connection.setConnectTimeout(timeout);
+ connection.setReadTimeout(timeout);
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setDoOutput(true);
+ return connection;
+ }
+
+ private JsonObject getBusControllerSubscribeBody(BusControllerConfig config) {
+ JsonObject subscriberObj = new JsonObject();
+ subscriberObj.addProperty("dcaeLocationName", config.getDcaeLocation());
+ subscriberObj.addProperty("deliveryURL", config.getDeliveryURL());
+ subscriberObj.addProperty("feedId", config.getFeedId());
+ subscriberObj.addProperty("lastMod", config.getLastMod());
+ subscriberObj.addProperty("username", config.getUsername());
+ subscriberObj.addProperty("userpwd", config.getPassword());
+ return subscriberObj;
+ }
+
+ private void subscribe(int attempts, int timeout, BusControllerConfig config) throws TooManyTriesException, InterruptedException {
+ int subResponse = 504;
+ String subMessage = "";
+ try {
+ HttpURLConnection connection = getBusControllerConnection(config, timeout);
+
+ try (OutputStream bodyStream = connection.getOutputStream();
+ OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) {
+ bodyWriter.write(getBusControllerSubscribeBody(config).toString());
+ }
+ subResponse = connection.getResponseCode();
+ subMessage = connection.getResponseMessage();
+ } catch (IOException e) {
+ log.info("Timeout Failure:", e);
+ }
+ log.info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage);
+ if (subResponse >= 300 && attempts > 1) {
+ Thread.sleep(timeout);
+ subscribe(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), config);
+ } else if (subResponse >= 300) {
+ throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts");
+ }
+ }
+
+ /**
+ * Receives inbound requests, verifies that required headers are valid
+ * and passes an Event onto the eventReceiver.
+ * The forwarded httpServerExchange response is the responsibility of the eventReceiver.
+ *
+ * @param httpServerExchange inbound http server exchange.
+ */
+ @Override
+ public void handleRequest(HttpServerExchange httpServerExchange) {
+ if (limited) {
+ httpServerExchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE)
+ .getResponseSender()
+ .send(StatusCodes.SERVICE_UNAVAILABLE_STRING);
+ } else {
+ try {
+ String metadataAsString = Optional.of(httpServerExchange.getRequestHeaders()
+ .get(METADATA_HEADER))
+ .map((HeaderValues headerValues) -> headerValues.get(0))
+ .orElseThrow(() -> new NoMetadataException("Metadata Not found"));
+
+ EventMetadata metadata = metadataBuilder.fromJson(metadataAsString, EventMetadata.class);
+ httpServerExchange.getRequestReceiver()
+ .receiveFullString((callbackExchange, body) -> {
+ httpServerExchange.dispatch(() -> eventReceiver.receive(new Event(callbackExchange, body, metadata)));
+ });
+ } catch (NoMetadataException exception) {
+ log.info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
+ httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+ .getResponseSender()
+ .send(NO_METADATA_MESSAGE);
+ } catch (JsonParseException exception) {
+ log.info("Bad Request: Failure to parse metadata", exception);
+ httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+ .getResponseSender()
+ .send(BAD_METADATA_MESSAGE);
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java
new file mode 100644
index 0000000..77c8153
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java
@@ -0,0 +1,31 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+
+/**
+ * Sink for Events received from the data router subscriber.
+ */
+public interface EventReceiver {
+ void receive(Event event);
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java
new file mode 100644
index 0000000..280b9da
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+public class NoMetadataException extends Exception {
+ public NoMetadataException(String errorMessage) {
+ super(errorMessage);
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java
new file mode 100644
index 0000000..922239b
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java
@@ -0,0 +1,29 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+/**
+ * Exception indicates that a task has been attempted too many times.
+ */
+public class TooManyTriesException extends Exception {
+ public TooManyTriesException(String errorMessage){
+ super(errorMessage);
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
new file mode 100644
index 0000000..a08dcfb
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.model;
+
+import io.undertow.server.HttpServerExchange;
+import lombok.Data;
+import lombok.NonNull;
+
+/**
+ * Class used to pass around relevant inbound event data.
+ */
+@Data
+public class Event {
+ @NonNull
+ private HttpServerExchange httpServerExchange;
+ @NonNull
+ private String body;
+ @NonNull
+ private EventMetadata metadata;
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
new file mode 100644
index 0000000..601b00f
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
@@ -0,0 +1,50 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.model;
+
+import lombok.Data;
+import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
+
+/**
+ * Metadata for inbound event onto data router subscriber.
+ */
+@Data
+public class EventMetadata {
+ @GSONRequired
+ private String productName;
+ @GSONRequired
+ private String vendorName;
+ @GSONRequired
+ private String startEpochMicrosec;
+ @GSONRequired
+ private String lastEpochMicrosec;
+ @GSONRequired
+ private String sourceName;
+ @GSONRequired
+ private String timeZoneOffset;
+ @GSONRequired
+ private String location;
+ @GSONRequired
+ private String compression;
+ @GSONRequired
+ private String fileFormatType;
+ @GSONRequired
+ private String fileFormatVersion;
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java
new file mode 100644
index 0000000..a6ce7b9
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java
@@ -0,0 +1,33 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation used to make a field required for Gson.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface GSONRequired {
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java
new file mode 100644
index 0000000..e956398
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Type;
+
+
+/**
+ * Extension of the default deserializer with support for GSONRequired annotation.
+ * @param <T> Type of object for deserialization process.
+ */
+public class RequiredFieldDeserializer<T> implements JsonDeserializer<T> {
+
+ @Override
+ public T deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
+ T obj = new Gson().fromJson(jsonElement, type);
+ for (Field field : obj.getClass().getDeclaredFields()) {
+ if (field.getAnnotation(GSONRequired.class) != null) {
+ field.setAccessible(true);
+ try {
+ if (field.get(obj) == null) {
+ throw new JsonParseException(String.format("Field: '%s', is required but not found.", field.getName()));
+ }
+ } catch (Exception exception) {
+ throw new JsonParseException("Failed to check fields.", exception);
+ }
+ }
+ }
+
+ return obj;
+ }
+
+}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
new file mode 100644
index 0000000..8f73c91
--- /dev/null
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
@@ -0,0 +1,225 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig;
+import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import io.undertow.io.Receiver;
+import io.undertow.io.Sender;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DataRouterSubscriber.class)
+public class DataRouterSubscriberTest {
+
+
+ @Mock
+ private EventReceiver eventReceiver;
+
+ private DataRouterSubscriber objUnderTest;
+
+ @Before
+ public void setUp() {
+ objUnderTest = new DataRouterSubscriber(eventReceiver);
+ }
+
+ @Test
+ public void testStartTooManyTriesWithResponse() throws IOException {
+ PowerMockito.mockStatic(Thread.class);
+
+ URL subURL = mock(URL.class);
+ BusControllerConfig config = new BusControllerConfig();
+ config.setDataRouterSubscribeEndpoint(subURL);
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ when(subURL.openConnection()).thenReturn(huc);
+ when(huc.getResponseCode()).thenReturn(300);
+ Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config));
+ }
+
+ @Test
+ public void testStartImmediateSuccess() throws IOException, TooManyTriesException, InterruptedException {
+ URL subURL = mock(URL.class);
+ BusControllerConfig config = new BusControllerConfig();
+ config.setDataRouterSubscribeEndpoint(subURL);
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ when(subURL.openConnection()).thenReturn(huc);
+ when(huc.getResponseCode()).thenReturn(200);
+ objUnderTest.start(config);
+ verify(huc, times(1)).getResponseCode();
+ }
+
+ @Test
+ public void testStartDelayedSuccess() throws IOException, TooManyTriesException, InterruptedException {
+ PowerMockito.mockStatic(Thread.class);
+
+ URL subURL = mock(URL.class);
+ BusControllerConfig config = new BusControllerConfig();
+ config.setDataRouterSubscribeEndpoint(subURL);
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ when(subURL.openConnection()).thenReturn(huc);
+ doAnswer(new Answer() {
+ boolean forceRetry = true;
+
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ if (forceRetry) {
+ forceRetry = false;
+ throw new IOException();
+ }
+ return 200;
+ }
+ }).when(huc).getResponseCode();
+ objUnderTest.start(config);
+ verify(huc, times(2)).getResponseCode();
+ }
+
+ @Test
+ public void testStartReadTimeout() throws IOException {
+ PowerMockito.mockStatic(Thread.class);
+
+ URL subURL = mock(URL.class);
+ BusControllerConfig config = new BusControllerConfig();
+ config.setDataRouterSubscribeEndpoint(subURL);
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ when(subURL.openConnection()).thenReturn(huc);
+ doThrow(new IOException()).when(huc).getResponseCode();
+ Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config));
+ }
+
+ @Test
+ public void testRequestInboundLimitedStateServiceUnavailable() throws Exception {
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class);
+ Sender responseSender = mock(Sender.class);
+ when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+ when(httpServerExchange.getResponseSender()).thenReturn(responseSender);
+ objUnderTest.setLimited(true);
+ objUnderTest.handleRequest(httpServerExchange);
+ verify(httpServerExchange).setStatusCode(StatusCodes.SERVICE_UNAVAILABLE);
+ }
+
+ @Test
+ public void testRequestInboundLimitedStateServiceNoEmission() throws Exception {
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class);
+ Sender responseSender = mock(Sender.class);
+ when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+ when(httpServerExchange.getResponseSender()).thenReturn(responseSender);
+ objUnderTest.setLimited(true);
+ objUnderTest.handleRequest(httpServerExchange);
+ verify(eventReceiver, times(0)).receive(any());
+ }
+
+
+
+ @Test
+ public void testRequestInboundInvalidMetadata() throws Exception {
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+ JsonObject metadata = new JsonParser().parse(new String(Files.readAllBytes(Paths.get("src/test/resources/invalid_metadata.json")))).getAsJsonObject();
+ when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt())).thenReturn(metadata.toString());
+ when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+ objUnderTest.handleRequest(httpServerExchange);
+ verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
+ verify(httpServerExchange.getResponseSender(), times(1)).send("Malformed Metadata.");
+
+ }
+
+ @Test
+ public void testRequestInboundNoMetadata() throws Exception{
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+ Receiver receiver = mock(Receiver.class);
+ when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
+ when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
+ callback.handle(httpServerExchange, "");
+ return null;
+ }).when(receiver).receiveFullString(any());
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ Runnable runnable = invocationOnMock.getArgument(0);
+ runnable.run();
+ return null;
+ }).when(httpServerExchange).dispatch(any(Runnable.class));
+ objUnderTest.handleRequest(httpServerExchange);
+ verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
+ verify(httpServerExchange.getResponseSender(), times(1)).send("Missing Metadata.");
+
+ }
+
+ @Test
+ public void testRequestInboundSuccess() throws Exception {
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+ Receiver receiver = mock(Receiver.class);
+ when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
+ String testString = "MESSAGE BODY";
+ JsonObject metadata = new JsonParser().parse(new String(Files.readAllBytes(Paths.get("src/test/resources/valid_metadata.json")))).getAsJsonObject();
+ EventMetadata metadataObj = new GsonBuilder().create().fromJson(metadata, EventMetadata.class);
+
+ when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt())).thenReturn(metadata.toString());
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
+ callback.handle(httpServerExchange, testString);
+ return null;
+ }).when(receiver).receiveFullString(any());
+
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ Runnable runnable = invocationOnMock.getArgument(0);
+ runnable.run();
+ return null;
+ }).when(httpServerExchange).dispatch(any(Runnable.class));
+
+ objUnderTest.handleRequest(httpServerExchange);
+ verify(eventReceiver, times(1)).receive(new Event(httpServerExchange, testString, metadataObj));
+ }
+}
diff --git a/src/test/resources/invalid_metadata.json b/src/test/resources/invalid_metadata.json
new file mode 100644
index 0000000..31600b0
--- /dev/null
+++ b/src/test/resources/invalid_metadata.json
@@ -0,0 +1,11 @@
+{
+ "vendorName": "Ericsson",
+ "lastEpochMicrosec": "1538478000000",
+ "sourceName": "oteNB5309",
+ "startEpochMicrosec": "1538478900000",
+ "timeZoneOffset": "UTC+05.00",
+ "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
+ "compression": "gzip",
+ "fileFormatType": "org.3GPP.32.435#measCollec",
+ "fileFormatVersion": "V9"
+} \ No newline at end of file
diff --git a/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..1f0955d
--- /dev/null
+++ b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
diff --git a/src/test/resources/valid_metadata.json b/src/test/resources/valid_metadata.json
new file mode 100644
index 0000000..21de3fb
--- /dev/null
+++ b/src/test/resources/valid_metadata.json
@@ -0,0 +1,12 @@
+{
+ "productName": "NrRadio",
+ "vendorName": "Ericsson",
+ "lastEpochMicrosec": "1538478000000",
+ "sourceName": "oteNB5309",
+ "startEpochMicrosec": "1538478900000",
+ "timeZoneOffset": "UTC+05.00",
+ "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
+ "compression": "gzip",
+ "fileFormatType": "org.3GPP.32.435#measCollec",
+ "fileFormatVersion": "V9"
+} \ No newline at end of file