diff options
author | 2019-01-22 16:09:34 +0000 | |
---|---|---|
committer | 2019-01-22 16:09:34 +0000 | |
commit | 8338d2e36a8aba7907a78025b8381d2cb05eff06 (patch) | |
tree | 6e0cc04a89ab8aad68c9c0ddd57a0d9b62ebec64 /src/main/java | |
parent | 7e0818cfb729efa7617893b8cb6f9afb0faba83f (diff) |
Add DataRouter Subscriber
Change-Id: I7f81c3d7249dcfcf00e2c7f3272f478d2346397d
Issue-ID: DCAEGEN2-1079
Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
Diffstat (limited to 'src/main/java')
10 files changed, 531 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java new file mode 100644 index 0000000..2b93d03 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -0,0 +1,50 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper; + +import io.undertow.Handlers; +import io.undertow.Undertow; +import io.undertow.util.StatusCodes; +import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig; +import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber; +import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; + +import java.net.MalformedURLException; +import java.net.URL; + +public class App { + + public static void main(String[] args) throws MalformedURLException, InterruptedException, TooManyTriesException { + DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(event -> { + event.getHttpServerExchange().unDispatch(); + event.getHttpServerExchange().getResponseSender().send(StatusCodes.OK_STRING); + System.out.println(event.getMetadata().getProductName()); + }); + BusControllerConfig config = new BusControllerConfig(); + config.setDataRouterSubscribeEndpoint(new URL("http://" + System.getenv("DMAAP_BC_SERVICE_HOST") + ":" + System.getenv("DMAAP_BC_SERVICE_PORT") + "/webapi/dr_subs")); + dataRouterSubscriber.start(config); + + Undertow.builder() + .addHttpListener(8081, "0.0.0.0") + .setHandler(Handlers.routing().add("put", "/sub", dataRouterSubscriber)) + .build().start(); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java new file mode 100644 index 0000000..63b2a32 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.config; + +import lombok.Data; + +import java.net.URL; + +/** + * Stub for BusControllerConfiguration object. + */ +@Data +public class BusControllerConfig { + + private String dcaeLocation = "dcaeLocation"; + private String deliveryURL = "deliveryURL"; + private int feedId = 2; + private String lastMod = "lastMod"; + private String username = "username"; + private String password = "password"; + private URL dataRouterSubscribeEndpoint; + +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java new file mode 100644 index 0000000..1d27d3b --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java @@ -0,0 +1,177 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.datarouter; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import io.undertow.util.HeaderValues; +import lombok.Data; +import lombok.NonNull; +import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig; +import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException; +import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; +import org.onap.dcaegen2.services.pmmapper.model.EventMetadata; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.StatusCodes; + +import lombok.extern.slf4j.Slf4j; +import org.onap.dcaegen2.services.pmmapper.utils.RequiredFieldDeserializer; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.HttpURLConnection; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.Random; + +/** + * Subscriber for events sent from data router + * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to. + */ +@Slf4j +@Data +public class DataRouterSubscriber implements HttpHandler { + + private static final int NUMBER_OF_ATTEMPTS = 5; + private static final int DEFAULT_TIMEOUT = 2000; + private static final int MAX_JITTER = 50; + + private static final String METADATA_HEADER = "X-ATT-DR-META"; + private static final String BAD_METADATA_MESSAGE = "Malformed Metadata."; + private static final String NO_METADATA_MESSAGE = "Missing Metadata."; + + private boolean limited = false; + private Random jitterGenerator; + private Gson metadataBuilder; + @NonNull + private EventReceiver eventReceiver; + + /** + * @param eventReceiver receiver for any inbound events. + */ + public DataRouterSubscriber(EventReceiver eventReceiver) { + this.eventReceiver = eventReceiver; + this.jitterGenerator = new Random(); + this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>()) + .create(); + } + + /** + * Starts data flow by subscribing to data router through bus controller. + * + * @param config configuration object containing bus controller endpoint for subscription and + * all non constant configuration for subscription through this endpoint. + * @throws TooManyTriesException in the event that timeout has occurred several times. + */ + public void start(BusControllerConfig config) throws TooManyTriesException, InterruptedException { + subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config); + } + + private HttpURLConnection getBusControllerConnection(BusControllerConfig config, int timeout) throws IOException { + HttpURLConnection connection = (HttpURLConnection) config.getDataRouterSubscribeEndpoint() + .openConnection(); + connection.setRequestMethod("POST"); + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setDoOutput(true); + return connection; + } + + private JsonObject getBusControllerSubscribeBody(BusControllerConfig config) { + JsonObject subscriberObj = new JsonObject(); + subscriberObj.addProperty("dcaeLocationName", config.getDcaeLocation()); + subscriberObj.addProperty("deliveryURL", config.getDeliveryURL()); + subscriberObj.addProperty("feedId", config.getFeedId()); + subscriberObj.addProperty("lastMod", config.getLastMod()); + subscriberObj.addProperty("username", config.getUsername()); + subscriberObj.addProperty("userpwd", config.getPassword()); + return subscriberObj; + } + + private void subscribe(int attempts, int timeout, BusControllerConfig config) throws TooManyTriesException, InterruptedException { + int subResponse = 504; + String subMessage = ""; + try { + HttpURLConnection connection = getBusControllerConnection(config, timeout); + + try (OutputStream bodyStream = connection.getOutputStream(); + OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) { + bodyWriter.write(getBusControllerSubscribeBody(config).toString()); + } + subResponse = connection.getResponseCode(); + subMessage = connection.getResponseMessage(); + } catch (IOException e) { + log.info("Timeout Failure:", e); + } + log.info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage); + if (subResponse >= 300 && attempts > 1) { + Thread.sleep(timeout); + subscribe(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), config); + } else if (subResponse >= 300) { + throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts"); + } + } + + /** + * Receives inbound requests, verifies that required headers are valid + * and passes an Event onto the eventReceiver. + * The forwarded httpServerExchange response is the responsibility of the eventReceiver. + * + * @param httpServerExchange inbound http server exchange. + */ + @Override + public void handleRequest(HttpServerExchange httpServerExchange) { + if (limited) { + httpServerExchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE) + .getResponseSender() + .send(StatusCodes.SERVICE_UNAVAILABLE_STRING); + } else { + try { + String metadataAsString = Optional.of(httpServerExchange.getRequestHeaders() + .get(METADATA_HEADER)) + .map((HeaderValues headerValues) -> headerValues.get(0)) + .orElseThrow(() -> new NoMetadataException("Metadata Not found")); + + EventMetadata metadata = metadataBuilder.fromJson(metadataAsString, EventMetadata.class); + httpServerExchange.getRequestReceiver() + .receiveFullString((callbackExchange, body) -> { + httpServerExchange.dispatch(() -> eventReceiver.receive(new Event(callbackExchange, body, metadata))); + }); + } catch (NoMetadataException exception) { + log.info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception); + httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST) + .getResponseSender() + .send(NO_METADATA_MESSAGE); + } catch (JsonParseException exception) { + log.info("Bad Request: Failure to parse metadata", exception); + httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST) + .getResponseSender() + .send(BAD_METADATA_MESSAGE); + } + } + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java new file mode 100644 index 0000000..77c8153 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java @@ -0,0 +1,31 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.datarouter; + + +import org.onap.dcaegen2.services.pmmapper.model.Event; + +/** + * Sink for Events received from the data router subscriber. + */ +public interface EventReceiver { + void receive(Event event); +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java new file mode 100644 index 0000000..280b9da --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.exceptions; + +public class NoMetadataException extends Exception { + public NoMetadataException(String errorMessage) { + super(errorMessage); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java new file mode 100644 index 0000000..922239b --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java @@ -0,0 +1,29 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.exceptions; + +/** + * Exception indicates that a task has been attempted too many times. + */ +public class TooManyTriesException extends Exception { + public TooManyTriesException(String errorMessage){ + super(errorMessage); + } +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java new file mode 100644 index 0000000..a08dcfb --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java @@ -0,0 +1,37 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.model; + +import io.undertow.server.HttpServerExchange; +import lombok.Data; +import lombok.NonNull; + +/** + * Class used to pass around relevant inbound event data. + */ +@Data +public class Event { + @NonNull + private HttpServerExchange httpServerExchange; + @NonNull + private String body; + @NonNull + private EventMetadata metadata; +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java new file mode 100644 index 0000000..601b00f --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java @@ -0,0 +1,50 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.model; + +import lombok.Data; +import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired; + +/** + * Metadata for inbound event onto data router subscriber. + */ +@Data +public class EventMetadata { + @GSONRequired + private String productName; + @GSONRequired + private String vendorName; + @GSONRequired + private String startEpochMicrosec; + @GSONRequired + private String lastEpochMicrosec; + @GSONRequired + private String sourceName; + @GSONRequired + private String timeZoneOffset; + @GSONRequired + private String location; + @GSONRequired + private String compression; + @GSONRequired + private String fileFormatType; + @GSONRequired + private String fileFormatVersion; +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java new file mode 100644 index 0000000..a6ce7b9 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java @@ -0,0 +1,33 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.utils; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation used to make a field required for Gson. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface GSONRequired { +} diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java new file mode 100644 index 0000000..e956398 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.pmmapper.utils; + +import com.google.gson.Gson; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; + +import java.lang.reflect.Field; +import java.lang.reflect.Type; + + +/** + * Extension of the default deserializer with support for GSONRequired annotation. + * @param <T> Type of object for deserialization process. + */ +public class RequiredFieldDeserializer<T> implements JsonDeserializer<T> { + + @Override + public T deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException { + T obj = new Gson().fromJson(jsonElement, type); + for (Field field : obj.getClass().getDeclaredFields()) { + if (field.getAnnotation(GSONRequired.class) != null) { + field.setAccessible(true); + try { + if (field.get(obj) == null) { + throw new JsonParseException(String.format("Field: '%s', is required but not found.", field.getName())); + } + } catch (Exception exception) { + throw new JsonParseException("Failed to check fields.", exception); + } + } + } + + return obj; + } + +} |