diff options
author | JoeOLeary <joseph.o.leary@est.tech> | 2019-01-22 16:09:34 +0000 |
---|---|---|
committer | JoeOLeary <joseph.o.leary@est.tech> | 2019-01-22 16:09:34 +0000 |
commit | 8338d2e36a8aba7907a78025b8381d2cb05eff06 (patch) | |
tree | 6e0cc04a89ab8aad68c9c0ddd57a0d9b62ebec64 /src | |
parent | 7e0818cfb729efa7617893b8cb6f9afb0faba83f (diff) |
Add DataRouter Subscriber
Change-Id: I7f81c3d7249dcfcf00e2c7f3272f478d2346397d
Issue-ID: DCAEGEN2-1079
Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
Diffstat (limited to 'src')
14 files changed, 780 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java new file mode 100644 index 0000000..2b93d03 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -0,0 +1,50 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper; + +import io.undertow.Handlers; +import io.undertow.Undertow; +import io.undertow.util.StatusCodes; +import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig; +import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber; +import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; + +import java.net.MalformedURLException; +import java.net.URL; + +public class App { + + public static void main(String[] args) throws MalformedURLException, InterruptedException, TooManyTriesException { + DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(event -> { + event.getHttpServerExchange().unDispatch(); + event.getHttpServerExchange().getResponseSender().send(StatusCodes.OK_STRING); + System.out.println(event.getMetadata().getProductName()); + }); + BusControllerConfig config = new BusControllerConfig(); + config.setDataRouterSubscribeEndpoint(new URL("http://" + System.getenv("DMAAP_BC_SERVICE_HOST") + ":" + System.getenv("DMAAP_BC_SERVICE_PORT") + "/webapi/dr_subs")); + dataRouterSubscriber.start(config); + + Undertow.builder() + .addHttpListener(8081, "0.0.0.0") + .setHandler(Handlers.routing().add("put", "/sub", dataRouterSubscriber)) + .build().start(); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java new file mode 100644 index 0000000..63b2a32 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.config; + +import lombok.Data; + +import java.net.URL; + +/** + * Stub for BusControllerConfiguration object. + */ +@Data +public class BusControllerConfig { + + private String dcaeLocation = "dcaeLocation"; + private String deliveryURL = "deliveryURL"; + private int feedId = 2; + private String lastMod = "lastMod"; + private String username = "username"; + private String password = "password"; + private URL dataRouterSubscribeEndpoint; + +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java new file mode 100644 index 0000000..1d27d3b --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java @@ -0,0 +1,177 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.datarouter; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import io.undertow.util.HeaderValues; +import lombok.Data; +import lombok.NonNull; +import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig; +import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException; +import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; +import org.onap.dcaegen2.services.pmmapper.model.EventMetadata; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.StatusCodes; + +import lombok.extern.slf4j.Slf4j; +import org.onap.dcaegen2.services.pmmapper.utils.RequiredFieldDeserializer; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.HttpURLConnection; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.Random; + +/** + * Subscriber for events sent from data router + * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to. + */ +@Slf4j +@Data +public class DataRouterSubscriber implements HttpHandler { + + private static final int NUMBER_OF_ATTEMPTS = 5; + private static final int DEFAULT_TIMEOUT = 2000; + private static final int MAX_JITTER = 50; + + private static final String METADATA_HEADER = "X-ATT-DR-META"; + private static final String BAD_METADATA_MESSAGE = "Malformed Metadata."; + private static final String NO_METADATA_MESSAGE = "Missing Metadata."; + + private boolean limited = false; + private Random jitterGenerator; + private Gson metadataBuilder; + @NonNull + private EventReceiver eventReceiver; + + /** + * @param eventReceiver receiver for any inbound events. + */ + public DataRouterSubscriber(EventReceiver eventReceiver) { + this.eventReceiver = eventReceiver; + this.jitterGenerator = new Random(); + this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>()) + .create(); + } + + /** + * Starts data flow by subscribing to data router through bus controller. + * + * @param config configuration object containing bus controller endpoint for subscription and + * all non constant configuration for subscription through this endpoint. + * @throws TooManyTriesException in the event that timeout has occurred several times. + */ + public void start(BusControllerConfig config) throws TooManyTriesException, InterruptedException { + subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config); + } + + private HttpURLConnection getBusControllerConnection(BusControllerConfig config, int timeout) throws IOException { + HttpURLConnection connection = (HttpURLConnection) config.getDataRouterSubscribeEndpoint() + .openConnection(); + connection.setRequestMethod("POST"); + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setDoOutput(true); + return connection; + } + + private JsonObject getBusControllerSubscribeBody(BusControllerConfig config) { + JsonObject subscriberObj = new JsonObject(); + subscriberObj.addProperty("dcaeLocationName", config.getDcaeLocation()); + subscriberObj.addProperty("deliveryURL", config.getDeliveryURL()); + subscriberObj.addProperty("feedId", config.getFeedId()); + subscriberObj.addProperty("lastMod", config.getLastMod()); + subscriberObj.addProperty("username", config.getUsername()); + subscriberObj.addProperty("userpwd", config.getPassword()); + return subscriberObj; + } + + private void subscribe(int attempts, int timeout, BusControllerConfig config) throws TooManyTriesException, InterruptedException { + int subResponse = 504; + String subMessage = ""; + try { + HttpURLConnection connection = getBusControllerConnection(config, timeout); + + try (OutputStream bodyStream = connection.getOutputStream(); + OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) { + bodyWriter.write(getBusControllerSubscribeBody(config).toString()); + } + subResponse = connection.getResponseCode(); + subMessage = connection.getResponseMessage(); + } catch (IOException e) { + log.info("Timeout Failure:", e); + } + log.info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage); + if (subResponse >= 300 && attempts > 1) { + Thread.sleep(timeout); + subscribe(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), config); + } else if (subResponse >= 300) { + throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts"); + } + } + + /** + * Receives inbound requests, verifies that required headers are valid + * and passes an Event onto the eventReceiver. + * The forwarded httpServerExchange response is the responsibility of the eventReceiver. + * + * @param httpServerExchange inbound http server exchange. + */ + @Override + public void handleRequest(HttpServerExchange httpServerExchange) { + if (limited) { + httpServerExchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE) + .getResponseSender() + .send(StatusCodes.SERVICE_UNAVAILABLE_STRING); + } else { + try { + String metadataAsString = Optional.of(httpServerExchange.getRequestHeaders() + .get(METADATA_HEADER)) + .map((HeaderValues headerValues) -> headerValues.get(0)) + .orElseThrow(() -> new NoMetadataException("Metadata Not found")); + + EventMetadata metadata = metadataBuilder.fromJson(metadataAsString, EventMetadata.class); + httpServerExchange.getRequestReceiver() + .receiveFullString((callbackExchange, body) -> { + httpServerExchange.dispatch(() -> eventReceiver.receive(new Event(callbackExchange, body, metadata))); + }); + } catch (NoMetadataException exception) { + log.info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception); + httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST) + .getResponseSender() + .send(NO_METADATA_MESSAGE); + } catch (JsonParseException exception) { + log.info("Bad Request: Failure to parse metadata", exception); + httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST) + .getResponseSender() + .send(BAD_METADATA_MESSAGE); + } + } + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java new file mode 100644 index 0000000..77c8153 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java @@ -0,0 +1,31 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.datarouter; + + +import org.onap.dcaegen2.services.pmmapper.model.Event; + +/** + * Sink for Events received from the data router subscriber. + */ +public interface EventReceiver { + void receive(Event event); +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java new file mode 100644 index 0000000..280b9da --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.exceptions; + +public class NoMetadataException extends Exception { + public NoMetadataException(String errorMessage) { + super(errorMessage); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java new file mode 100644 index 0000000..922239b --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java @@ -0,0 +1,29 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.exceptions; + +/** + * Exception indicates that a task has been attempted too many times. + */ +public class TooManyTriesException extends Exception { + public TooManyTriesException(String errorMessage){ + super(errorMessage); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java new file mode 100644 index 0000000..a08dcfb --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java @@ -0,0 +1,37 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.model; + +import io.undertow.server.HttpServerExchange; +import lombok.Data; +import lombok.NonNull; + +/** + * Class used to pass around relevant inbound event data. + */ +@Data +public class Event { + @NonNull + private HttpServerExchange httpServerExchange; + @NonNull + private String body; + @NonNull + private EventMetadata metadata; +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java new file mode 100644 index 0000000..601b00f --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java @@ -0,0 +1,50 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.model; + +import lombok.Data; +import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired; + +/** + * Metadata for inbound event onto data router subscriber. + */ +@Data +public class EventMetadata { + @GSONRequired + private String productName; + @GSONRequired + private String vendorName; + @GSONRequired + private String startEpochMicrosec; + @GSONRequired + private String lastEpochMicrosec; + @GSONRequired + private String sourceName; + @GSONRequired + private String timeZoneOffset; + @GSONRequired + private String location; + @GSONRequired + private String compression; + @GSONRequired + private String fileFormatType; + @GSONRequired + private String fileFormatVersion; +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java new file mode 100644 index 0000000..a6ce7b9 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java @@ -0,0 +1,33 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.utils; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation used to make a field required for Gson. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface GSONRequired { +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java new file mode 100644 index 0000000..e956398 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.utils; + +import com.google.gson.Gson; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; + +import java.lang.reflect.Field; +import java.lang.reflect.Type; + + +/** + * Extension of the default deserializer with support for GSONRequired annotation. + * @param <T> Type of object for deserialization process. + */ +public class RequiredFieldDeserializer<T> implements JsonDeserializer<T> { + + @Override + public T deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException { + T obj = new Gson().fromJson(jsonElement, type); + for (Field field : obj.getClass().getDeclaredFields()) { + if (field.getAnnotation(GSONRequired.class) != null) { + field.setAccessible(true); + try { + if (field.get(obj) == null) { + throw new JsonParseException(String.format("Field: '%s', is required but not found.", field.getName())); + } + } catch (Exception exception) { + throw new JsonParseException("Failed to check fields.", exception); + } + } + } + + return obj; + } + +} diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java new file mode 100644 index 0000000..8f73c91 --- /dev/null +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java @@ -0,0 +1,225 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.datarouter; + +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig; +import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import org.onap.dcaegen2.services.pmmapper.model.EventMetadata; +import io.undertow.io.Receiver; +import io.undertow.io.Sender; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.StatusCodes; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + + +@RunWith(PowerMockRunner.class) +@PrepareForTest(DataRouterSubscriber.class) +public class DataRouterSubscriberTest { + + + @Mock + private EventReceiver eventReceiver; + + private DataRouterSubscriber objUnderTest; + + @Before + public void setUp() { + objUnderTest = new DataRouterSubscriber(eventReceiver); + } + + @Test + public void testStartTooManyTriesWithResponse() throws IOException { + PowerMockito.mockStatic(Thread.class); + + URL subURL = mock(URL.class); + BusControllerConfig config = new BusControllerConfig(); + config.setDataRouterSubscribeEndpoint(subURL); + HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + when(subURL.openConnection()).thenReturn(huc); + when(huc.getResponseCode()).thenReturn(300); + Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config)); + } + + @Test + public void testStartImmediateSuccess() throws IOException, TooManyTriesException, InterruptedException { + URL subURL = mock(URL.class); + BusControllerConfig config = new BusControllerConfig(); + config.setDataRouterSubscribeEndpoint(subURL); + HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + when(subURL.openConnection()).thenReturn(huc); + when(huc.getResponseCode()).thenReturn(200); + objUnderTest.start(config); + verify(huc, times(1)).getResponseCode(); + } + + @Test + public void testStartDelayedSuccess() throws IOException, TooManyTriesException, InterruptedException { + PowerMockito.mockStatic(Thread.class); + + URL subURL = mock(URL.class); + BusControllerConfig config = new BusControllerConfig(); + config.setDataRouterSubscribeEndpoint(subURL); + HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + when(subURL.openConnection()).thenReturn(huc); + doAnswer(new Answer() { + boolean forceRetry = true; + + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + if (forceRetry) { + forceRetry = false; + throw new IOException(); + } + return 200; + } + }).when(huc).getResponseCode(); + objUnderTest.start(config); + verify(huc, times(2)).getResponseCode(); + } + + @Test + public void testStartReadTimeout() throws IOException { + PowerMockito.mockStatic(Thread.class); + + URL subURL = mock(URL.class); + BusControllerConfig config = new BusControllerConfig(); + config.setDataRouterSubscribeEndpoint(subURL); + HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS); + when(subURL.openConnection()).thenReturn(huc); + doThrow(new IOException()).when(huc).getResponseCode(); + Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config)); + } + + @Test + public void testRequestInboundLimitedStateServiceUnavailable() throws Exception { + HttpServerExchange httpServerExchange = mock(HttpServerExchange.class); + Sender responseSender = mock(Sender.class); + when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange); + when(httpServerExchange.getResponseSender()).thenReturn(responseSender); + objUnderTest.setLimited(true); + objUnderTest.handleRequest(httpServerExchange); + verify(httpServerExchange).setStatusCode(StatusCodes.SERVICE_UNAVAILABLE); + } + + @Test + public void testRequestInboundLimitedStateServiceNoEmission() throws Exception { + HttpServerExchange httpServerExchange = mock(HttpServerExchange.class); + Sender responseSender = mock(Sender.class); + when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange); + when(httpServerExchange.getResponseSender()).thenReturn(responseSender); + objUnderTest.setLimited(true); + objUnderTest.handleRequest(httpServerExchange); + verify(eventReceiver, times(0)).receive(any()); + } + + + + @Test + public void testRequestInboundInvalidMetadata() throws Exception { + HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS); + JsonObject metadata = new JsonParser().parse(new String(Files.readAllBytes(Paths.get("src/test/resources/invalid_metadata.json")))).getAsJsonObject(); + when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt())).thenReturn(metadata.toString()); + when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange); + objUnderTest.handleRequest(httpServerExchange); + verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST); + verify(httpServerExchange.getResponseSender(), times(1)).send("Malformed Metadata."); + + } + + @Test + public void testRequestInboundNoMetadata() throws Exception{ + HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS); + Receiver receiver = mock(Receiver.class); + when(httpServerExchange.getRequestReceiver()).thenReturn(receiver); + when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange); + + doAnswer((Answer<Void>) invocationOnMock -> { + Receiver.FullStringCallback callback = invocationOnMock.getArgument(0); + callback.handle(httpServerExchange, ""); + return null; + }).when(receiver).receiveFullString(any()); + doAnswer((Answer<Void>) invocationOnMock -> { + Runnable runnable = invocationOnMock.getArgument(0); + runnable.run(); + return null; + }).when(httpServerExchange).dispatch(any(Runnable.class)); + objUnderTest.handleRequest(httpServerExchange); + verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST); + verify(httpServerExchange.getResponseSender(), times(1)).send("Missing Metadata."); + + } + + @Test + public void testRequestInboundSuccess() throws Exception { + HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS); + Receiver receiver = mock(Receiver.class); + when(httpServerExchange.getRequestReceiver()).thenReturn(receiver); + String testString = "MESSAGE BODY"; + JsonObject metadata = new JsonParser().parse(new String(Files.readAllBytes(Paths.get("src/test/resources/valid_metadata.json")))).getAsJsonObject(); + EventMetadata metadataObj = new GsonBuilder().create().fromJson(metadata, EventMetadata.class); + + when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt())).thenReturn(metadata.toString()); + doAnswer((Answer<Void>) invocationOnMock -> { + Receiver.FullStringCallback callback = invocationOnMock.getArgument(0); + callback.handle(httpServerExchange, testString); + return null; + }).when(receiver).receiveFullString(any()); + + doAnswer((Answer<Void>) invocationOnMock -> { + Runnable runnable = invocationOnMock.getArgument(0); + runnable.run(); + return null; + }).when(httpServerExchange).dispatch(any(Runnable.class)); + + objUnderTest.handleRequest(httpServerExchange); + verify(eventReceiver, times(1)).receive(new Event(httpServerExchange, testString, metadataObj)); + } +} diff --git a/src/test/resources/invalid_metadata.json b/src/test/resources/invalid_metadata.json new file mode 100644 index 0000000..31600b0 --- /dev/null +++ b/src/test/resources/invalid_metadata.json @@ -0,0 +1,11 @@ +{ + "vendorName": "Ericsson", + "lastEpochMicrosec": "1538478000000", + "sourceName": "oteNB5309", + "startEpochMicrosec": "1538478900000", + "timeZoneOffset": "UTC+05.00", + "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz", + "compression": "gzip", + "fileFormatType": "org.3GPP.32.435#measCollec", + "fileFormatVersion": "V9" +}
\ No newline at end of file diff --git a/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000..1f0955d --- /dev/null +++ b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline diff --git a/src/test/resources/valid_metadata.json b/src/test/resources/valid_metadata.json new file mode 100644 index 0000000..21de3fb --- /dev/null +++ b/src/test/resources/valid_metadata.json @@ -0,0 +1,12 @@ +{ + "productName": "NrRadio", + "vendorName": "Ericsson", + "lastEpochMicrosec": "1538478000000", + "sourceName": "oteNB5309", + "startEpochMicrosec": "1538478900000", + "timeZoneOffset": "UTC+05.00", + "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz", + "compression": "gzip", + "fileFormatType": "org.3GPP.32.435#measCollec", + "fileFormatVersion": "V9" +}
\ No newline at end of file |