From 74b598291ed2461e0e482f556baf2943a97a54f2 Mon Sep 17 00:00:00 2001 From: Maciej Malewski Date: Tue, 8 Jun 2021 09:04:48 +0200 Subject: Replace cambria with DmaaP client - remove cambria, add DmaaP client - sending event for many topics at once is no longer supported - add backward compatibility status codes - add additional validation for batchEvent Issue-ID: DCAEGEN2-1483 Signed-off-by: Maciej Malewski Change-Id: I945c38b4ab04b697ecfabd5ce38502f83fa70d1a --- .../java/org/onap/dcae/ApplicationSettings.java | 49 ++++++--- .../java/org/onap/dcae/common/EventSender.java | 56 ++++------ src/main/java/org/onap/dcae/common/VESLogger.java | 106 ------------------ .../model/BackwardsCompatibilityException.java | 23 ++++ .../onap/dcae/common/model/InternalException.java | 35 ++++++ .../dcae/common/model/PayloadToLargeException.java | 23 ++++ .../java/org/onap/dcae/common/model/VesEvent.java | 14 ++- .../publishing/DMaaPConfigurationParser.java | 20 +--- .../common/publishing/DMaaPEventPublisher.java | 84 +++++--------- .../common/publishing/DMaaPPublishersBuilder.java | 61 ----------- .../common/publishing/DMaaPPublishersCache.java | 121 --------------------- .../publishing/DmaapRequestConfiguration.java | 101 +++++++++++++++++ .../publishing/MessageRouterHttpStatusMapper.java | 107 ++++++++++++++++++ .../org/onap/dcae/common/publishing/Publisher.java | 64 +++++++++++ .../dcae/common/publishing/PublisherConfig.java | 7 +- .../dcae/common/validator/BatchEventValidator.java | 78 +++++++++++++ .../common/validator/GeneralEventValidator.java | 1 - .../MultipleStreamReducer.java | 50 +++++++++ .../java/org/onap/dcae/restapi/ApiException.java | 16 ++- .../org/onap/dcae/restapi/VesRestController.java | 51 ++++----- 20 files changed, 621 insertions(+), 446 deletions(-) delete mode 100644 src/main/java/org/onap/dcae/common/VESLogger.java create mode 100644 src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java create mode 100644 src/main/java/org/onap/dcae/common/model/InternalException.java create mode 100644 src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java delete mode 100644 src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java delete mode 100644 src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java create mode 100644 src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java create mode 100644 src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java create mode 100644 src/main/java/org/onap/dcae/common/publishing/Publisher.java create mode 100644 src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java create mode 100644 src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java (limited to 'src/main/java/org/onap/dcae') diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java index 7bdef655..0acbbe26 100644 --- a/src/main/java/org/onap/dcae/ApplicationSettings.java +++ b/src/main/java/org/onap/dcae/ApplicationSettings.java @@ -1,9 +1,9 @@ /* * ============LICENSE_START======================================================= - * PROJECT + * VES Collector * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 - 2020 Nokia. All rights reserved.s + * Copyright (C) 2018 - 2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,6 @@ package org.onap.dcae; -import static java.lang.String.format; - import com.google.common.annotations.VisibleForTesting; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; @@ -30,26 +28,32 @@ import com.networknt.schema.JsonSchema; import io.vavr.Function1; import io.vavr.collection.HashMap; import io.vavr.collection.Map; -import java.io.FileReader; -import java.io.IOException; -import java.lang.reflect.Type; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import javax.annotation.Nullable; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.onap.dcae.common.EventTransformation; import org.onap.dcae.common.configuration.AuthMethodType; +import org.onap.dcae.multiplestreamreducer.MultipleStreamReducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import java.io.FileReader; +import java.io.IOException; +import java.lang.reflect.Type; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +import static java.lang.String.format; + /** * Abstraction over application configuration. * Its job is to provide easily discoverable (by method names lookup) and type safe access to configuration properties. */ public class ApplicationSettings { + public static String responseCompatibility; + private static final String EVENT_TRANSFORM_FILE_PATH = "./etc/eventTransform.json"; private static final String COULD_NOT_FIND_FILE = "Couldn't find file " + EVENT_TRANSFORM_FILE_PATH; @@ -62,6 +66,7 @@ public class ApplicationSettings { private final PropertiesConfiguration properties = new PropertiesConfiguration(); private final Map loadedJsonSchemas; private final List eventTransformations; + private final MultipleStreamReducer multipleStreamReducer = new MultipleStreamReducer(); public ApplicationSettings(String[] args, Function1> argsParser) { this(args, argsParser, System.getProperty("user.dir")); @@ -78,6 +83,7 @@ public class ApplicationSettings { format("{\"%s\":\"etc/CommonEventFormat_28.4.1.json\"}", FALLBACK_VES_VERSION)); loadedJsonSchemas = new JSonSchemasSupplier().loadJsonSchemas(collectorSchemaFile); eventTransformations = loadEventTransformations(); + responseCompatibility = getResponseCompatibilityFlag(); } /** @@ -155,7 +161,7 @@ public class ApplicationSettings { } public String dMaaPConfigurationFileLocation() { - return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/DmaapConfig.json")); + return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/ves-dmaap-config.json")); } public String certSubjectMatcher(){ @@ -166,13 +172,9 @@ public class ApplicationSettings { return properties.getString("auth.method", AuthMethodType.NO_AUTH.value()); } - public Map getDmaapStreamIds() { + public Map getDmaapStreamIds() { String streamIdsProperty = properties.getString("collector.dmaap.streamid", null); - if (streamIdsProperty == null) { - return HashMap.empty(); - } else { - return convertDMaaPStreamsPropertyToMap(streamIdsProperty); - } + return streamIdsProperty == null ? HashMap.empty() : reduceStream(streamIdsProperty); } public boolean getExternalSchemaValidationCheckflag() { @@ -203,6 +205,10 @@ public class ApplicationSettings { return properties.getString("collector.description.api.version.location", "etc/api_version_description.json"); } + private String getResponseCompatibilityFlag() { + return properties.getString("collector.response.compatibility", "v7.2"); + } + private void loadPropertiesFromFile() { try { properties.load(configurationFileLocation); @@ -261,6 +267,13 @@ public class ApplicationSettings { } } + private Map reduceStream(String streamIdsProperty) { + Map dMaaPStreamsProperty = convertDMaaPStreamsPropertyToMap(streamIdsProperty); + final Map domainToStreamConfig = multipleStreamReducer.reduce(dMaaPStreamsProperty); + log.warn(multipleStreamReducer.getDomainToStreamsInfo(domainToStreamConfig)); + return domainToStreamConfig; + } + @VisibleForTesting String getStringDirectly(String key) { return properties.getString(key); diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index 81c463dc..400597ff 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -3,7 +3,7 @@ * VES Collector * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018,2020 Nokia. All rights reserved. + * Copyright (C) 2018-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,52 +20,38 @@ */ package org.onap.dcae.common; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; + import io.vavr.collection.Map; import org.onap.dcae.common.model.VesEvent; import org.onap.dcae.common.publishing.DMaaPEventPublisher; +import org.onap.dcae.restapi.EventValidatorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; import java.util.List; -public class EventSender { +import static org.onap.dcae.restapi.ApiException.DOMAIN_NOT_DEFINED_FOR_STREAM_ID; - private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); - private Map streamIdToDmaapIds; - private DMaaPEventPublisher eventPublisher; - private static final Logger log = LoggerFactory.getLogger(EventSender.class); - public EventSender(DMaaPEventPublisher eventPublisher, Map streamIdToDmaapIds) { - this.eventPublisher = eventPublisher; - this.streamIdToDmaapIds = streamIdToDmaapIds; - } +public class EventSender { - public void send(List vesEvents) { - for (VesEvent vesEvent : vesEvents) { - metriclog.info("EVENT_PUBLISH_START"); - setLoggingContext(vesEvent); - streamIdToDmaapIds.get(vesEvent.getStreamId()) - .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + vesEvent.asJsonObject())) - .forEach(dmaapIds -> sendEventsToStreams(vesEvent, dmaapIds)); - log.debug("Message published" + vesEvent.asJsonObject()); - } - log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); - metriclog.info("EVENT_PUBLISH_END"); - } + private Map streamIdToDmaapIds; + private DMaaPEventPublisher eventPublisher; + private static final Logger log = LoggerFactory.getLogger(EventSender.class); - private void sendEventsToStreams(VesEvent vesEvent, String[] dmaapIds) { - for (String dmaapId : dmaapIds) { - log.info("Invoking publisher for streamId/domain:" + dmaapId); - eventPublisher.sendEvent(vesEvent, dmaapId); + public EventSender(DMaaPEventPublisher eventPublisher, Map streamIdToDmaapIds) { + this.eventPublisher = eventPublisher; + this.streamIdToDmaapIds = streamIdToDmaapIds; } - } - private void setLoggingContext(VesEvent vesEvent) { - LoggingContext localLC = VESLogger.getLoggingContextForThread(vesEvent.getUniqueId().toString()); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("event.VESuniqueId" + vesEvent.getUniqueId() + "event.commonEventHeader.domain:" + vesEvent.getDomain()); - } + public HttpStatus send(List vesEvents) { + String topic = streamIdToDmaapIds + .get(vesEvents.get(0).getStreamId()) + .getOrElse(() -> { + log.error("No StreamID defined for publish - Message dropped " + vesEvents.get(0).asJsonObject()); + throw new EventValidatorException(DOMAIN_NOT_DEFINED_FOR_STREAM_ID); + }); + return eventPublisher.sendEvent(vesEvents, topic); + } } diff --git a/src/main/java/org/onap/dcae/common/VESLogger.java b/src/main/java/org/onap/dcae/common/VESLogger.java deleted file mode 100644 index 1072fb54..00000000 --- a/src/main/java/org/onap/dcae/common/VESLogger.java +++ /dev/null @@ -1,106 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.common; - -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.LoggingContextFactory; -import com.att.nsa.logging.log4j.EcompFields; -import java.util.UUID; - -public class VESLogger { - - - public static final String REQUEST_ID = "requestId"; - - // Common LoggingContext - private static LoggingContext commonLC; - // Thread-specific LoggingContext - private static LoggingContext threadLC; - - /** - * Returns the common LoggingContext instance that is the base context for - * all subsequent instances. - * - * @return the common LoggingContext - */ - public static LoggingContext getCommonLoggingContext() { - if (commonLC == null) { - commonLC = new LoggingContextFactory.Builder().build(); - final UUID uuid = UUID.randomUUID(); - - commonLC.put(REQUEST_ID, uuid.toString()); - } - return commonLC; - } - - /** - * Get a logging context for the current thread that's based on the common - * logging context. Populate the context with context-specific values. - * - * @param aUuid uuid for request id - * @return a LoggingContext for the current thread - */ - public static LoggingContext getLoggingContextForThread(UUID aUuid) { - // note that this operation requires everything from the common context - // to be (re)copied into the target context. That seems slow, but it - // actually - // helps prevent the thread from overwriting supposedly common data. It - // also - // should be fairly quick compared with the overhead of handling the - // actual - // service call. - - threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); - // Establish the request-specific UUID, as long as we are here... - threadLC.put(REQUEST_ID, aUuid.toString()); - threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); - - return threadLC; - } - - /** - * Get a logging context for the current thread that's based on the common - * logging context. Populate the context with context-specific values. - * - * @param aUuid uuid for request id - * @return a LoggingContext for the current thread - */ - public static LoggingContext getLoggingContextForThread(String aUuid) { - // note that this operation requires everything from the common context - // to be (re)copied into the target context. That seems slow, but it - // actually - // helps prevent the thread from overwriting supposedly common data. It - // also - // should be fairly quick compared with the overhead of handling the - // actual - // service call. - - threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); - // Establish the request-specific UUID, as long as we are here... - threadLC.put(REQUEST_ID, aUuid); - threadLC.put("statusCode", "COMPLETE"); - threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); - return threadLC; - } - -} diff --git a/src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java b/src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java new file mode 100644 index 00000000..aab3c44b --- /dev/null +++ b/src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java @@ -0,0 +1,23 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.model; + +public class BackwardsCompatibilityException extends RuntimeException { +} \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/common/model/InternalException.java b/src/main/java/org/onap/dcae/common/model/InternalException.java new file mode 100644 index 00000000..da93e5db --- /dev/null +++ b/src/main/java/org/onap/dcae/common/model/InternalException.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.model; + +import org.onap.dcae.restapi.ApiException; + +public class InternalException extends RuntimeException { + + private final ApiException apiException; + + public InternalException(ApiException apiException) { + this.apiException = apiException; + } + + public ApiException getApiException() { + return apiException; + } +} diff --git a/src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java b/src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java new file mode 100644 index 00000000..e82ad775 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java @@ -0,0 +1,23 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.model; + +public class PayloadToLargeException extends RuntimeException { +} diff --git a/src/main/java/org/onap/dcae/common/model/VesEvent.java b/src/main/java/org/onap/dcae/common/model/VesEvent.java index 8e2db80e..88419555 100644 --- a/src/main/java/org/onap/dcae/common/model/VesEvent.java +++ b/src/main/java/org/onap/dcae/common/model/VesEvent.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * VES Collector * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved.s + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,9 @@ package org.onap.dcae.common.model; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.json.JSONException; import org.json.JSONObject; +import java.util.Optional; /** * This class is a wrapper for JSONObject, that represents VES event. @@ -111,6 +113,16 @@ public class VesEvent { return event.get(VES_UNIQUE_ID); } + /** + * Returns optional stndDefinedNamespace name from VES event. + * + * @return Optional stndDefinedNamespace + */ + public Optional getStndDefinedNamespace() throws JSONException { + return isStdDefinedDomain(getDomain()) ? Optional.ofNullable(getEventHeader()) + .map(header -> header.getString(STND_DEFINED_NAMESPACE)) : Optional.empty(); + } + /** * Checks if type of event is same as given in paramaters. * diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java index 274e4490..9f8ffcc6 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java @@ -63,28 +63,10 @@ public final class DMaaPConfigurationParser { } private static Try> toConfigMap(AnyNode config) { - return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config)) + return Try(() -> parseNewFormat(config)) .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); } - private static boolean usesLegacyFormat(AnyNode dMaaPConfig) { - return dMaaPConfig.has("channels"); - } - - private static Map parseLegacyFormat(AnyNode root) { - return root.get("channels").toList().toMap( - channel -> channel.get("name").toString(), - channel -> { - String destinationsStr = channel.getAsOption("cambria.url") - .getOrElse(channel.getAsOption("cambria.hosts").get()) - .toString(); - String topic = channel.get("cambria.topic").toString(); - Option maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); - Option maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); - List destinations = List(destinationsStr.split(",")); - return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); - }); - } private static Map parseNewFormat(AnyNode root) { return root.keys().toMap( diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 2b4cfc15..08e16e0c 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.collectors.ves * ================================================================================ * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018,2020 Nokia. All rights reserved. + * Copyright (C) 2018-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,36 +21,29 @@ package org.onap.dcae.common.publishing; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; -import io.vavr.control.Try; -import org.onap.dcae.common.VESLogger; import org.onap.dcae.common.model.VesEvent; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import reactor.core.publisher.Flux; -import java.io.IOException; - -import static org.onap.dcae.common.publishing.VavrUtils.f; +import java.util.List; +import java.util.Objects; +import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ public class DMaaPEventPublisher { - private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); - private DMaaPPublishersCache publishersCache; - private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output"); - - DMaaPEventPublisher(DMaaPPublishersCache publishersCache) { - this.publishersCache = publishersCache; - } + private Map dMaaPConfig; + private final Publisher dmaapPublisher; public DMaaPEventPublisher(Map dMaaPConfig) { - this(new DMaaPPublishersCache(dMaaPConfig)); + this.dMaaPConfig = dMaaPConfig; + dmaapPublisher = new Publisher(); } /** @@ -58,48 +51,29 @@ public class DMaaPEventPublisher { * @param dmaapConfiguration Dmaap configuration */ public void reload(Map dmaapConfiguration){ - this.publishersCache = new DMaaPPublishersCache(dmaapConfiguration); - } - - public void sendEvent(VesEvent vesEvent, String dmaapId){ - clearVesUniqueIdFromEvent(vesEvent); - publishersCache.getPublisher(dmaapId) - .onEmpty(() -> - log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent))) - .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher)); - } - - private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) { - Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher)) - .onFailure(exc -> closePublisher(event, dmaapId, exc)); + dMaaPConfig = dmaapConfiguration; + log.info("reload dmaap configuration"); } - private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) - throws IOException { - - String pk = event.getPK(); - int pendingMsgs = publisher.send(pk, event.asJsonObject().toString()); - if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { - log.info("Pending messages count: " + pendingMsgs); - } - String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId); - log.info(infoMsg); - outputLogger.info(infoMsg); + public HttpStatus sendEvent(List vesEvents, String dmaapId) { + clearVesUniqueIdFromEvent(vesEvents); + io.vavr.collection.List events = mapListOfEventsToVavrList(vesEvents); + Flux messageRouterPublishFlux = dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId)); + MessageRouterPublishResponse messageRouterPublishResponse = messageRouterPublishFlux.blockFirst(); + return getHttpStatus(Objects.requireNonNull(messageRouterPublishResponse)); } - private void closePublisher(VesEvent event, String dmaapId, Throwable e) { - log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", - event, dmaapId), e); - publishersCache.closePublisherFor(dmaapId); + private io.vavr.collection.List mapListOfEventsToVavrList(List vesEvents) { + return io.vavr.collection.List.ofAll(vesEvents) + .map(event -> event.asJsonObject().toString()); } - private void clearVesUniqueIdFromEvent(VesEvent event) { - if (event.hasType(VesEvent.VES_UNIQUE_ID)) { - String uuid = event.getUniqueId().toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("Removing VESuniqueid object from event"); - event.removeElement(VesEvent.VES_UNIQUE_ID); - } + private void clearVesUniqueIdFromEvent(List events) { + events.stream() + .filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID)) + .forEach(event -> { + log.debug("Removing VESuniqueid object from event"); + event.removeElement(VesEvent.VES_UNIQUE_ID); + }); } } diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java deleted file mode 100644 index a93073bf..00000000 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java +++ /dev/null @@ -1,61 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.common.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import io.vavr.control.Try; - -import static io.vavr.API.Try; -import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; -import static org.onap.dcae.common.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -final class DMaaPPublishersBuilder { - - static Try buildPublisher(PublisherConfig config) { - return Try(() -> builder(config).build()) - .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); - } - - private static PublisherBuilder builder(PublisherConfig config) { - if (config.isSecured()) { - return authenticatedBuilder(config); - } else { - return unAuthenticatedBuilder(config); - } - } - - private static PublisherBuilder authenticatedBuilder(PublisherConfig config) { - return unAuthenticatedBuilder(config) - .usingHttps() - .authenticatedByHttp(config.userName().get(), config.password().get()); - } - - private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) { - return new CambriaClientBuilders.PublisherBuilder() - .usingHosts(config.destinations().mkString(",")) - .onTopic(config.topic()) - .logSendFailuresAfter(5); - } -} diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java deleted file mode 100644 index b7997ef9..00000000 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java +++ /dev/null @@ -1,121 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.common.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.common.cache.*; -import io.vavr.collection.Map; -import io.vavr.control.Option; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static io.vavr.API.Option; -import static org.onap.dcae.common.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -class DMaaPPublishersCache { - - private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class); - private final LoadingCache publishersCache; - private AtomicReference> dMaaPConfiguration; - - DMaaPPublishersCache(Map dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(new OnPublisherRemovalListener()) - .build(new CambriaPublishersCacheLoader()); - } - - DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, - OnPublisherRemovalListener onPublisherRemovalListener, - Map dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(onPublisherRemovalListener) - .build(dMaaPPublishersCacheLoader); - } - - Option getPublisher(String streamID) { - try { - return Option(publishersCache.getUnchecked(streamID)); - } catch (Exception e) { - log.warn("Could not create / load Cambria Publisher for streamID", e); - return Option.none(); - } - } - - void closePublisherFor(String streamId) { - publishersCache.invalidate(streamId); - } - - synchronized void reconfigure(Map newConfig) { - Map currentConfig = dMaaPConfiguration.get(); - Map removedConfigurations = currentConfig - .filterKeys(domain -> !newConfig.containsKey(domain)); - Map changedConfigurations = newConfig - .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); - dMaaPConfiguration.set(newConfig); - removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); - } - - static class OnPublisherRemovalListener implements RemovalListener { - - @Override - public void onRemoval(@Nonnull RemovalNotification notification) { - CambriaBatchingPublisher publisher = notification.getValue(); - if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull - try { - int timeout = 20; - TimeUnit unit = TimeUnit.SECONDS; - java.util.List stuck = publisher.close(timeout, unit); - if (!stuck.isEmpty()) { - log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " - + "%s messages were dropped", stuck.size(), timeout, unit)); - } - } catch (InterruptedException | IOException e) { - log.error("Could not close Cambria publisher, some messages might have been dropped", e); - Thread.currentThread().interrupt(); - } - } - } - } - - class CambriaPublishersCacheLoader extends CacheLoader { - - @Override - public CambriaBatchingPublisher load(@Nonnull String domain) { - return dMaaPConfiguration.get() - .get(domain) - .toTry(() -> new RuntimeException( - f("DMaaP configuration contains no configuration for domain: '%s'", domain))) - .flatMap(DMaaPPublishersBuilder::buildPublisher) - .get(); - } - } - -} diff --git a/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java new file mode 100644 index 00000000..2eaeab6a --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java @@ -0,0 +1,101 @@ +/*- + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import java.time.Duration; + +public class DmaapRequestConfiguration { + + private static final Long TIMEOUT_SECONDS = 10L; + private static final int RETRY_INTERVAL_IN_SECONDS = 1; + private static final int RETRY_COUNT = 1; + + private DmaapRequestConfiguration() { + } + + static MessageRouterPublishRequest createPublishRequest(Option publisherConfig, Long timeout) { + String topicUrl = createUrl(publisherConfig); + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(createMessageRouterSink(topicUrl)) + .contentType(ContentType.APPLICATION_JSON) + .timeoutConfig(timeOutConfiguration(timeout)) + .build(); + } + + static MessageRouterPublishRequest createPublishRequest(Option publisherConfig) { + return createPublishRequest(publisherConfig, TIMEOUT_SECONDS); + } + + static Flux jsonBatch(List messages) { + return Flux.fromIterable(getAsJsonObjects(messages)); + } + + static MessageRouterPublisherConfig retryConfiguration() { + return ImmutableMessageRouterPublisherConfig.builder() + .retryConfig(ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS) + .retryCount(RETRY_COUNT) + .build()) + .build(); + } + + private static String createUrl(Option publisherConfig) { + String hostAndPort = publisherConfig.get().getHostAndPort(); + String topicName = publisherConfig.get().topic(); + return String.format("http://%s/events/%s/",hostAndPort,topicName); + } + + private static List getAsJsonObjects(List messages) { + return getAsJsonElements(messages).map(JsonElement::getAsJsonObject); + } + + static List getAsJsonElements(List messages) { + return messages.map(JsonParser::parseString); + } + + static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) { + return ImmutableMessageRouterSink.builder() + .name("the topic") + .topicUrl(topicUrl) + .build(); + } + + @NotNull + private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) { + return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build(); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java new file mode 100644 index 00000000..b5c735b7 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java @@ -0,0 +1,107 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcae.common.model.BackwardsCompatibilityException; +import org.onap.dcae.common.model.InternalException; +import org.onap.dcae.common.model.PayloadToLargeException; +import org.onap.dcae.restapi.ApiException; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; + +import java.util.Objects; + +import static org.onap.dcae.ApplicationSettings.responseCompatibility; + +public class MessageRouterHttpStatusMapper { + + private static final Logger log = LoggerFactory.getLogger(MessageRouterHttpStatusMapper.class); + + private MessageRouterHttpStatusMapper() { + } + + @NotNull + static HttpStatus getHttpStatus(MessageRouterPublishResponse messageRouterPublishResponse) { + return responseCompatibility.equals("v7.2") ? + getHttpStatusBackwardsCompatibility(messageRouterPublishResponse): + getHttpStatusWithMappedResponseCode(messageRouterPublishResponse); + } + + @NotNull + private static HttpStatus getHttpStatusBackwardsCompatibility(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.ACCEPTED; + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new BackwardsCompatibilityException(); + } + } + + @NotNull + private static HttpStatus getHttpStatusWithMappedResponseCode(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.OK; + } else if (isHttp413(messageRouterPublishResponse)) { + log.error(messageRouterPublishResponse.failReason()); + throw new PayloadToLargeException(); + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new InternalException(responseBody(resolveHttpCode(messageRouterPublishResponse))); + } + } + + @NotNull + private static String resolveHttpCode(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).substring(0, 3); + } + + @NotNull + private static ApiException responseBody(String substring) { + switch (substring) { + case "404": + return ApiException.NOT_FOUND; + case "408": + return ApiException.REQUEST_TIMEOUT; + case "429": + return ApiException.TOO_MANY_REQUESTS; + case "502": + return ApiException.BAD_GATEWAY; + case "503": + return ApiException.SERVICE_UNAVAILABLE; + case "504": + return ApiException.GATEWAY_TIMEOUT; + default: + return ApiException.INTERNAL_SERVER_ERROR; + } + } + + private static boolean isHttpOk(MessageRouterPublishResponse messageRouterPublishResponse) { + return messageRouterPublishResponse.successful(); + } + + private static boolean isHttp413(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).startsWith("413"); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/Publisher.java b/src/main/java/org/onap/dcae/common/publishing/Publisher.java new file mode 100644 index 00000000..1d688d86 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/Publisher.java @@ -0,0 +1,64 @@ +/*- + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonObject; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch; + +public class Publisher { + + private final MessageRouterPublisher publisher; + + public Publisher() { + this(retryConfiguration()); + } + + public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) { + publisher = DmaapClientFactory + .createMessageRouterPublisher(messageRouterPublisherConfig); + } + + /** + * Publish event + * + * @param events list of ves events prepared to send + * @param publisherConfig publisher configuration + * @return flux containing information about the success or failure of the event publication + */ + public Flux publishEvents(List events, Option publisherConfig) { + return publishEvents(events, createPublishRequest(publisherConfig)); + } + + Flux publishEvents(List events, MessageRouterPublishRequest publishRequest) { + final Flux jsonMessageBatch = jsonBatch(events); + return publisher.put(publishRequest, jsonMessageBatch); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java index 1fd0d316..0bb51922 100644 --- a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java +++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018,2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ public final class PublisherConfig { this.topic = topic; } + PublisherConfig(List destinations, String topic, String userName, String password) { this.destinations = destinations; this.topic = topic; @@ -50,6 +51,10 @@ public final class PublisherConfig { return destinations; } + String getHostAndPort(){ + return destinations.get(0); + } + String topic() { return topic; } diff --git a/src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java b/src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java new file mode 100644 index 00000000..ea920740 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.validator; + +import io.vavr.control.Try; +import org.json.JSONException; +import org.onap.dcae.common.model.VesEvent; +import org.onap.dcae.restapi.ApiException; +import org.onap.dcae.restapi.EventValidatorException; + +import java.util.List; + +import static java.util.stream.Collectors.toSet; +import static org.onap.dcae.restapi.ApiException.DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT; +import static org.onap.dcae.restapi.ApiException.DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED; + +public class BatchEventValidator { + + private BatchEventValidator() { + } + + /** + * Check if value of domain fields are the same in every event, + * in case of stndDefined check stndDefinedNamespace fields + * + * @param events list of checked ves events + * @throws EventValidatorException when domain fields value or stndDefinedNamespace fields value are note the same + */ + public static void executeBatchEventValidation(List events) throws EventValidatorException { + if (hasNotEveryEventSameDomain(events)) { + throw new EventValidatorException(DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT); + } + if (isDomainStndDefined(events) && hasNotSameStndDefinedNamespace(events)) { + throw new EventValidatorException(DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED); + } + } + + private static boolean hasNotEveryEventSameDomain(List events) { + return events.stream() + .map(VesEvent::getDomain) + .collect(toSet()) + .size() != 1; + } + + private static boolean hasNotSameStndDefinedNamespace(List events) { + return Try.of(() -> isAllStndDefinedNamespace(events)) + .getOrElseThrow(() -> new EventValidatorException(ApiException.MISSING_NAMESPACE_PARAMETER)); + } + + private static boolean isAllStndDefinedNamespace(List events) { + return events.stream() + .map(e -> e.getStndDefinedNamespace().orElse("")) + .collect(toSet()) + .size() != 1; + } + + private static boolean isDomainStndDefined(List events) throws JSONException{ + return events.stream() + .allMatch((e -> e.getDomain().equals("stndDefined"))); + } +} diff --git a/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java b/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java index 5cd5dc25..975db791 100644 --- a/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java +++ b/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java @@ -25,7 +25,6 @@ import org.onap.dcae.ApplicationSettings; import org.onap.dcae.common.model.VesEvent; import org.onap.dcae.restapi.ApiException; import org.onap.dcae.restapi.EventValidatorException; - /** * This class is using ApplicationSetting and SchemaValidator to validate VES event. * diff --git a/src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java b/src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java new file mode 100644 index 00000000..c03ab6bb --- /dev/null +++ b/src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.multiplestreamreducer; + +import io.vavr.Tuple2; +import io.vavr.collection.Map; + +public class MultipleStreamReducer { + + /** + * Converts configuration from: "one domain many streams" + * to: "one domain one stream" + * + * @param map domain to streams configuration + * @return configuration - one domain one stream + */ + public Map reduce(Map map) { + return map.toStream() + .toMap(Tuple2::_1, v -> v._2[0]); + } + + /** + * Information about the current match: domain to stream + * + * @param domainToStreamConfig domain to stream configuration + * @return current domain to stream information + */ + public String getDomainToStreamsInfo(Map domainToStreamConfig) { + return domainToStreamConfig.map(v -> "Domain: " + + v._1 + " has active stream: " + v._2 + System.lineSeparator()) + .reduce((a, b) -> a + b); + } +} diff --git a/src/main/java/org/onap/dcae/restapi/ApiException.java b/src/main/java/org/onap/dcae/restapi/ApiException.java index dbd41a4d..e819cbd6 100644 --- a/src/main/java/org/onap/dcae/restapi/ApiException.java +++ b/src/main/java/org/onap/dcae/restapi/ApiException.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,8 +40,18 @@ public enum ApiException { NO_SERVER_RESOURCES(ExceptionType.SERVICE_EXCEPTION, "SVC1000", "No server resources (internal processing queue full)", 503), STND_DEFINED_VALIDATION_FAILED(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.data invalid against event.stndDefinedFields.schemaReference", "400"), 400), NO_LOCAL_SCHEMA_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2004", "Invalid input value for %1 %2: %3", List.of("attribute", "event.stndDefinedFields.schemaReference", "Referred external schema not present in schema repository"), 400), - INCORRECT_INTERNAL_FILE_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.schemaReference value does not correspond to any external event schema file in externalSchema repo", "400"), 400); - + INCORRECT_INTERNAL_FILE_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.schemaReference value does not correspond to any external event schema file in externalSchema repo", "400"), 400), + DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT(ExceptionType.SERVICE_EXCEPTION, "SVC0001", "Different value of domain fields in Batch Event", 400), + DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED(ExceptionType.SERVICE_EXCEPTION, "SVC0001","Value of stndDefinedNamespace fields have to be same when domain is stndDefined",400), + DOMAIN_NOT_DEFINED_FOR_STREAM_ID(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("No domain defined for stream id", "400"), 400), + PAYLOAD_TO_LARGE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Request Entity Too Large", "413"), 413), + NOT_FOUND(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Not Found","404"), 404), + REQUEST_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Request Timeout","408"), 408), + TOO_MANY_REQUESTS(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Too Many Requests","429"), 429), + INTERNAL_SERVER_ERROR(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Internal Server Error","500"), 500), + BAD_GATEWAY(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Bad Gateway","502"), 502), + SERVICE_UNAVAILABLE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Service Unavailable","503"), 503), + GATEWAY_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Gateway Timeout","504"), 504); public final int httpStatusCode; private final ExceptionType type; diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java index 93e428b4..6c4fb8ef 100644 --- a/src/main/java/org/onap/dcae/restapi/VesRestController.java +++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java @@ -3,7 +3,7 @@ * VES Collector * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,24 +21,24 @@ package org.onap.dcae.restapi; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; import org.json.JSONObject; import org.onap.dcae.ApplicationSettings; import org.onap.dcae.common.EventSender; import org.onap.dcae.common.EventUpdater; import org.onap.dcae.common.HeaderUtils; -import org.onap.dcae.common.validator.GeneralEventValidator; -import org.onap.dcae.common.validator.StndDefinedDataValidator; -import org.onap.dcae.common.VESLogger; +import org.onap.dcae.common.model.BackwardsCompatibilityException; +import org.onap.dcae.common.model.InternalException; +import org.onap.dcae.common.model.PayloadToLargeException; import org.onap.dcae.common.model.StndDefinedNamespaceParameterHasEmptyValueException; import org.onap.dcae.common.model.StndDefinedNamespaceParameterNotDefinedException; import org.onap.dcae.common.model.VesEvent; +import org.onap.dcae.common.validator.GeneralEventValidator; +import org.onap.dcae.common.validator.StndDefinedDataValidator; import org.onap.dcaegen2.services.sdk.standardization.header.CustomHeaderUtils; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PathVariable; @@ -50,8 +50,9 @@ import javax.servlet.http.HttpServletRequest; import java.util.List; import java.util.UUID; -import static org.springframework.http.ResponseEntity.accepted; +import static org.onap.dcae.common.validator.BatchEventValidator.executeBatchEventValidation; import static org.springframework.http.ResponseEntity.badRequest; +import static org.springframework.http.ResponseEntity.status; @RestController public class VesRestController { @@ -113,22 +114,29 @@ public class VesRestController { generalEventValidator.validate(vesEvent, type, version); List vesEvents = transformEvent(vesEvent, type, version, requestURI); executeStndDefinedValidation(vesEvents); - eventSender.send(vesEvents); + executeBatchEventValidation(vesEvents); + HttpStatus httpStatus = eventSender.send(vesEvents); + return status(httpStatus).contentType(MediaType.APPLICATION_JSON).body("Successfully send event"); } catch (EventValidatorException e) { - logger.error(e.getMessage()); - return ResponseEntity.status(e.getApiException().httpStatusCode) + logger.error(e.getMessage()); + return status(e.getApiException().httpStatusCode) .body(e.getApiException().toJSON().toString()); } catch (StndDefinedNamespaceParameterNotDefinedException e) { - return ResponseEntity.status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode) + return status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode) .body(ApiException.MISSING_NAMESPACE_PARAMETER.toJSON().toString()); } catch (StndDefinedNamespaceParameterHasEmptyValueException e) { - return ResponseEntity.status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode) + return status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode) .body(ApiException.EMPTY_NAMESPACE_PARAMETER.toJSON().toString()); + } catch (InternalException e) { + return status(ApiException.SERVICE_UNAVAILABLE.httpStatusCode) + .body(e.getApiException().toJSON().toString()); + } catch (PayloadToLargeException e) { + return status(ApiException.PAYLOAD_TO_LARGE.httpStatusCode) + .body(ApiException.PAYLOAD_TO_LARGE.toJSON().toString()); + } catch (BackwardsCompatibilityException e) { + return status(ApiException.INTERNAL_SERVER_ERROR.httpStatusCode) + .body(ApiException.INTERNAL_SERVER_ERROR.toJSON().toString()); } - - // TODO call service and return status, replace CambriaClient, split event to single object and list of them - return accepted().headers(this.headerUtils.fillHeaders(headerUtils.getRspCustomHeader())) - .contentType(MediaType.APPLICATION_JSON).body("Accepted"); } private void executeStndDefinedValidation(List vesEvents) { @@ -142,7 +150,6 @@ public class VesRestController { headerUtils.extractHeaders(request), settings.getApiVersionDescriptionFilepath(), headerUtils.getRestApiIdentify(request.getRequestURI())); - } private List transformEvent(VesEvent vesEvent, String type, String version, String requestURI) { @@ -151,13 +158,7 @@ public class VesRestController { private UUID generateUUID(VesEvent vesEvent, String version, String uri) { UUID uuid = UUID.randomUUID(); - setUpECOMPLoggingForRequest(uuid); requestLogger.info(String.format(VES_EVENT_MESSAGE, vesEvent.asJsonObject(), uuid, version, uri)); return uuid; } - - private static void setUpECOMPLoggingForRequest(UUID uuid) { - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - } -} \ No newline at end of file +} -- cgit 1.2.3-korg