aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae')
-rw-r--r--src/main/java/org/onap/dcae/ApplicationSettings.java49
-rw-r--r--src/main/java/org/onap/dcae/common/EventSender.java56
-rw-r--r--src/main/java/org/onap/dcae/common/VESLogger.java106
-rw-r--r--src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java23
-rw-r--r--src/main/java/org/onap/dcae/common/model/InternalException.java35
-rw-r--r--src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java23
-rw-r--r--src/main/java/org/onap/dcae/common/model/VesEvent.java14
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java20
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java84
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java61
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java121
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java101
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java107
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/Publisher.java64
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java7
-rw-r--r--src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java78
-rw-r--r--src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java1
-rw-r--r--src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java50
-rw-r--r--src/main/java/org/onap/dcae/restapi/ApiException.java16
-rw-r--r--src/main/java/org/onap/dcae/restapi/VesRestController.java51
20 files changed, 621 insertions, 446 deletions
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java
index 7bdef655..0acbbe26 100644
--- a/src/main/java/org/onap/dcae/ApplicationSettings.java
+++ b/src/main/java/org/onap/dcae/ApplicationSettings.java
@@ -1,9 +1,9 @@
/*
* ============LICENSE_START=======================================================
- * PROJECT
+ * VES Collector
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 - 2020 Nokia. All rights reserved.s
+ * Copyright (C) 2018 - 2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,8 +21,6 @@
package org.onap.dcae;
-import static java.lang.String.format;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
@@ -30,26 +28,32 @@ import com.networknt.schema.JsonSchema;
import io.vavr.Function1;
import io.vavr.collection.HashMap;
import io.vavr.collection.Map;
-import java.io.FileReader;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import javax.annotation.Nullable;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.onap.dcae.common.EventTransformation;
import org.onap.dcae.common.configuration.AuthMethodType;
+import org.onap.dcae.multiplestreamreducer.MultipleStreamReducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import static java.lang.String.format;
+
/**
* Abstraction over application configuration.
* Its job is to provide easily discoverable (by method names lookup) and type safe access to configuration properties.
*/
public class ApplicationSettings {
+ public static String responseCompatibility;
+
private static final String EVENT_TRANSFORM_FILE_PATH = "./etc/eventTransform.json";
private static final String COULD_NOT_FIND_FILE = "Couldn't find file " + EVENT_TRANSFORM_FILE_PATH;
@@ -62,6 +66,7 @@ public class ApplicationSettings {
private final PropertiesConfiguration properties = new PropertiesConfiguration();
private final Map<String, JsonSchema> loadedJsonSchemas;
private final List<EventTransformation> eventTransformations;
+ private final MultipleStreamReducer multipleStreamReducer = new MultipleStreamReducer();
public ApplicationSettings(String[] args, Function1<String[], Map<String, String>> argsParser) {
this(args, argsParser, System.getProperty("user.dir"));
@@ -78,6 +83,7 @@ public class ApplicationSettings {
format("{\"%s\":\"etc/CommonEventFormat_28.4.1.json\"}", FALLBACK_VES_VERSION));
loadedJsonSchemas = new JSonSchemasSupplier().loadJsonSchemas(collectorSchemaFile);
eventTransformations = loadEventTransformations();
+ responseCompatibility = getResponseCompatibilityFlag();
}
/**
@@ -155,7 +161,7 @@ public class ApplicationSettings {
}
public String dMaaPConfigurationFileLocation() {
- return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/DmaapConfig.json"));
+ return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/ves-dmaap-config.json"));
}
public String certSubjectMatcher(){
@@ -166,13 +172,9 @@ public class ApplicationSettings {
return properties.getString("auth.method", AuthMethodType.NO_AUTH.value());
}
- public Map<String, String[]> getDmaapStreamIds() {
+ public Map<String, String> getDmaapStreamIds() {
String streamIdsProperty = properties.getString("collector.dmaap.streamid", null);
- if (streamIdsProperty == null) {
- return HashMap.empty();
- } else {
- return convertDMaaPStreamsPropertyToMap(streamIdsProperty);
- }
+ return streamIdsProperty == null ? HashMap.empty() : reduceStream(streamIdsProperty);
}
public boolean getExternalSchemaValidationCheckflag() {
@@ -203,6 +205,10 @@ public class ApplicationSettings {
return properties.getString("collector.description.api.version.location", "etc/api_version_description.json");
}
+ private String getResponseCompatibilityFlag() {
+ return properties.getString("collector.response.compatibility", "v7.2");
+ }
+
private void loadPropertiesFromFile() {
try {
properties.load(configurationFileLocation);
@@ -261,6 +267,13 @@ public class ApplicationSettings {
}
}
+ private Map<String, String> reduceStream(String streamIdsProperty) {
+ Map<String, String[]> dMaaPStreamsProperty = convertDMaaPStreamsPropertyToMap(streamIdsProperty);
+ final Map<String, String> domainToStreamConfig = multipleStreamReducer.reduce(dMaaPStreamsProperty);
+ log.warn(multipleStreamReducer.getDomainToStreamsInfo(domainToStreamConfig));
+ return domainToStreamConfig;
+ }
+
@VisibleForTesting
String getStringDirectly(String key) {
return properties.getString(key);
diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java
index 81c463dc..400597ff 100644
--- a/src/main/java/org/onap/dcae/common/EventSender.java
+++ b/src/main/java/org/onap/dcae/common/EventSender.java
@@ -3,7 +3,7 @@
* VES Collector
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
+ * Copyright (C) 2018-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,52 +20,38 @@
*/
package org.onap.dcae.common;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
+
import io.vavr.collection.Map;
import org.onap.dcae.common.model.VesEvent;
import org.onap.dcae.common.publishing.DMaaPEventPublisher;
+import org.onap.dcae.restapi.EventValidatorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
import java.util.List;
-public class EventSender {
+import static org.onap.dcae.restapi.ApiException.DOMAIN_NOT_DEFINED_FOR_STREAM_ID;
- private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
- private Map<String, String[]> streamIdToDmaapIds;
- private DMaaPEventPublisher eventPublisher;
- private static final Logger log = LoggerFactory.getLogger(EventSender.class);
- public EventSender(DMaaPEventPublisher eventPublisher, Map<String, String[]> streamIdToDmaapIds) {
- this.eventPublisher = eventPublisher;
- this.streamIdToDmaapIds = streamIdToDmaapIds;
- }
+public class EventSender {
- public void send(List<VesEvent> vesEvents) {
- for (VesEvent vesEvent : vesEvents) {
- metriclog.info("EVENT_PUBLISH_START");
- setLoggingContext(vesEvent);
- streamIdToDmaapIds.get(vesEvent.getStreamId())
- .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + vesEvent.asJsonObject()))
- .forEach(dmaapIds -> sendEventsToStreams(vesEvent, dmaapIds));
- log.debug("Message published" + vesEvent.asJsonObject());
- }
- log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- metriclog.info("EVENT_PUBLISH_END");
- }
+ private Map<String, String> streamIdToDmaapIds;
+ private DMaaPEventPublisher eventPublisher;
+ private static final Logger log = LoggerFactory.getLogger(EventSender.class);
- private void sendEventsToStreams(VesEvent vesEvent, String[] dmaapIds) {
- for (String dmaapId : dmaapIds) {
- log.info("Invoking publisher for streamId/domain:" + dmaapId);
- eventPublisher.sendEvent(vesEvent, dmaapId);
+ public EventSender(DMaaPEventPublisher eventPublisher, Map<String, String> streamIdToDmaapIds) {
+ this.eventPublisher = eventPublisher;
+ this.streamIdToDmaapIds = streamIdToDmaapIds;
}
- }
- private void setLoggingContext(VesEvent vesEvent) {
- LoggingContext localLC = VESLogger.getLoggingContextForThread(vesEvent.getUniqueId().toString());
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- log.debug("event.VESuniqueId" + vesEvent.getUniqueId() + "event.commonEventHeader.domain:" + vesEvent.getDomain());
- }
+ public HttpStatus send(List<VesEvent> vesEvents) {
+ String topic = streamIdToDmaapIds
+ .get(vesEvents.get(0).getStreamId())
+ .getOrElse(() -> {
+ log.error("No StreamID defined for publish - Message dropped " + vesEvents.get(0).asJsonObject());
+ throw new EventValidatorException(DOMAIN_NOT_DEFINED_FOR_STREAM_ID);
+ });
+ return eventPublisher.sendEvent(vesEvents, topic);
+ }
}
diff --git a/src/main/java/org/onap/dcae/common/VESLogger.java b/src/main/java/org/onap/dcae/common/VESLogger.java
deleted file mode 100644
index 1072fb54..00000000
--- a/src/main/java/org/onap/dcae/common/VESLogger.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.common;
-
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.LoggingContextFactory;
-import com.att.nsa.logging.log4j.EcompFields;
-import java.util.UUID;
-
-public class VESLogger {
-
-
- public static final String REQUEST_ID = "requestId";
-
- // Common LoggingContext
- private static LoggingContext commonLC;
- // Thread-specific LoggingContext
- private static LoggingContext threadLC;
-
- /**
- * Returns the common LoggingContext instance that is the base context for
- * all subsequent instances.
- *
- * @return the common LoggingContext
- */
- public static LoggingContext getCommonLoggingContext() {
- if (commonLC == null) {
- commonLC = new LoggingContextFactory.Builder().build();
- final UUID uuid = UUID.randomUUID();
-
- commonLC.put(REQUEST_ID, uuid.toString());
- }
- return commonLC;
- }
-
- /**
- * Get a logging context for the current thread that's based on the common
- * logging context. Populate the context with context-specific values.
- *
- * @param aUuid uuid for request id
- * @return a LoggingContext for the current thread
- */
- public static LoggingContext getLoggingContextForThread(UUID aUuid) {
- // note that this operation requires everything from the common context
- // to be (re)copied into the target context. That seems slow, but it
- // actually
- // helps prevent the thread from overwriting supposedly common data. It
- // also
- // should be fairly quick compared with the overhead of handling the
- // actual
- // service call.
-
- threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build();
- // Establish the request-specific UUID, as long as we are here...
- threadLC.put(REQUEST_ID, aUuid.toString());
- threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
-
- return threadLC;
- }
-
- /**
- * Get a logging context for the current thread that's based on the common
- * logging context. Populate the context with context-specific values.
- *
- * @param aUuid uuid for request id
- * @return a LoggingContext for the current thread
- */
- public static LoggingContext getLoggingContextForThread(String aUuid) {
- // note that this operation requires everything from the common context
- // to be (re)copied into the target context. That seems slow, but it
- // actually
- // helps prevent the thread from overwriting supposedly common data. It
- // also
- // should be fairly quick compared with the overhead of handling the
- // actual
- // service call.
-
- threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build();
- // Establish the request-specific UUID, as long as we are here...
- threadLC.put(REQUEST_ID, aUuid);
- threadLC.put("statusCode", "COMPLETE");
- threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
- return threadLC;
- }
-
-}
diff --git a/src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java b/src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java
new file mode 100644
index 00000000..aab3c44b
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java
@@ -0,0 +1,23 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.model;
+
+public class BackwardsCompatibilityException extends RuntimeException {
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/common/model/InternalException.java b/src/main/java/org/onap/dcae/common/model/InternalException.java
new file mode 100644
index 00000000..da93e5db
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/model/InternalException.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.model;
+
+import org.onap.dcae.restapi.ApiException;
+
+public class InternalException extends RuntimeException {
+
+ private final ApiException apiException;
+
+ public InternalException(ApiException apiException) {
+ this.apiException = apiException;
+ }
+
+ public ApiException getApiException() {
+ return apiException;
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java b/src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java
new file mode 100644
index 00000000..e82ad775
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java
@@ -0,0 +1,23 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.model;
+
+public class PayloadToLargeException extends RuntimeException {
+}
diff --git a/src/main/java/org/onap/dcae/common/model/VesEvent.java b/src/main/java/org/onap/dcae/common/model/VesEvent.java
index 8e2db80e..88419555 100644
--- a/src/main/java/org/onap/dcae/common/model/VesEvent.java
+++ b/src/main/java/org/onap/dcae/common/model/VesEvent.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* VES Collector
* ================================================================================
- * Copyright (C) 2020 Nokia. All rights reserved.s
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +22,9 @@ package org.onap.dcae.common.model;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.json.JSONException;
import org.json.JSONObject;
+import java.util.Optional;
/**
* This class is a wrapper for JSONObject, that represents VES event.
@@ -112,6 +114,16 @@ public class VesEvent {
}
/**
+ * Returns optional stndDefinedNamespace name from VES event.
+ *
+ * @return Optional stndDefinedNamespace
+ */
+ public Optional<String> getStndDefinedNamespace() throws JSONException {
+ return isStdDefinedDomain(getDomain()) ? Optional.ofNullable(getEventHeader())
+ .map(header -> header.getString(STND_DEFINED_NAMESPACE)) : Optional.empty();
+ }
+
+ /**
* Checks if type of event is same as given in paramaters.
*
* @param type name that will be compared with event type
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java
index 274e4490..9f8ffcc6 100644
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java
@@ -63,28 +63,10 @@ public final class DMaaPConfigurationParser {
}
private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
- return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
+ return Try(() -> parseNewFormat(config))
.mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
}
- private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
- return dMaaPConfig.has("channels");
- }
-
- private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
- return root.get("channels").toList().toMap(
- channel -> channel.get("name").toString(),
- channel -> {
- String destinationsStr = channel.getAsOption("cambria.url")
- .getOrElse(channel.getAsOption("cambria.hosts").get())
- .toString();
- String topic = channel.get("cambria.topic").toString();
- Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
- Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
- List<String> destinations = List(destinationsStr.split(","));
- return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
- });
- }
private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
return root.keys().toMap(
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
index 2b4cfc15..08e16e0c 100644
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.collectors.ves
* ================================================================================
* Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
+ * Copyright (C) 2018-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,36 +21,29 @@
package org.onap.dcae.common.publishing;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
import io.vavr.collection.Map;
-import io.vavr.control.Try;
-import org.onap.dcae.common.VESLogger;
import org.onap.dcae.common.model.VesEvent;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import reactor.core.publisher.Flux;
-import java.io.IOException;
-
-import static org.onap.dcae.common.publishing.VavrUtils.f;
+import java.util.List;
+import java.util.Objects;
+import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus;
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
public class DMaaPEventPublisher {
- private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
- private DMaaPPublishersCache publishersCache;
- private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");
-
- DMaaPEventPublisher(DMaaPPublishersCache publishersCache) {
- this.publishersCache = publishersCache;
- }
+ private Map<String, PublisherConfig> dMaaPConfig;
+ private final Publisher dmaapPublisher;
public DMaaPEventPublisher(Map<String, PublisherConfig> dMaaPConfig) {
- this(new DMaaPPublishersCache(dMaaPConfig));
+ this.dMaaPConfig = dMaaPConfig;
+ dmaapPublisher = new Publisher();
}
/**
@@ -58,48 +51,29 @@ public class DMaaPEventPublisher {
* @param dmaapConfiguration Dmaap configuration
*/
public void reload(Map<String, PublisherConfig> dmaapConfiguration){
- this.publishersCache = new DMaaPPublishersCache(dmaapConfiguration);
- }
-
- public void sendEvent(VesEvent vesEvent, String dmaapId){
- clearVesUniqueIdFromEvent(vesEvent);
- publishersCache.getPublisher(dmaapId)
- .onEmpty(() ->
- log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent)))
- .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher));
- }
-
- private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) {
- Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher))
- .onFailure(exc -> closePublisher(event, dmaapId, exc));
+ dMaaPConfig = dmaapConfiguration;
+ log.info("reload dmaap configuration");
}
- private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher)
- throws IOException {
-
- String pk = event.getPK();
- int pendingMsgs = publisher.send(pk, event.asJsonObject().toString());
- if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
- log.info("Pending messages count: " + pendingMsgs);
- }
- String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId);
- log.info(infoMsg);
- outputLogger.info(infoMsg);
+ public HttpStatus sendEvent(List<VesEvent> vesEvents, String dmaapId) {
+ clearVesUniqueIdFromEvent(vesEvents);
+ io.vavr.collection.List<String> events = mapListOfEventsToVavrList(vesEvents);
+ Flux<MessageRouterPublishResponse> messageRouterPublishFlux = dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId));
+ MessageRouterPublishResponse messageRouterPublishResponse = messageRouterPublishFlux.blockFirst();
+ return getHttpStatus(Objects.requireNonNull(messageRouterPublishResponse));
}
- private void closePublisher(VesEvent event, String dmaapId, Throwable e) {
- log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
- event, dmaapId), e);
- publishersCache.closePublisherFor(dmaapId);
+ private io.vavr.collection.List<String> mapListOfEventsToVavrList(List<VesEvent> vesEvents) {
+ return io.vavr.collection.List.ofAll(vesEvents)
+ .map(event -> event.asJsonObject().toString());
}
- private void clearVesUniqueIdFromEvent(VesEvent event) {
- if (event.hasType(VesEvent.VES_UNIQUE_ID)) {
- String uuid = event.getUniqueId().toString();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- log.debug("Removing VESuniqueid object from event");
- event.removeElement(VesEvent.VES_UNIQUE_ID);
- }
+ private void clearVesUniqueIdFromEvent(List<VesEvent> events) {
+ events.stream()
+ .filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID))
+ .forEach(event -> {
+ log.debug("Removing VESuniqueid object from event");
+ event.removeElement(VesEvent.VES_UNIQUE_ID);
+ });
}
}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java
deleted file mode 100644
index a93073bf..00000000
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import io.vavr.control.Try;
-
-import static io.vavr.API.Try;
-import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
-import static org.onap.dcae.common.publishing.VavrUtils.f;
-
-/**
- * @author Pawel Szalapski (pawel.szalapski@nokia.com)
- */
-final class DMaaPPublishersBuilder {
-
- static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) {
- return Try(() -> builder(config).build())
- .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
- }
-
- private static PublisherBuilder builder(PublisherConfig config) {
- if (config.isSecured()) {
- return authenticatedBuilder(config);
- } else {
- return unAuthenticatedBuilder(config);
- }
- }
-
- private static PublisherBuilder authenticatedBuilder(PublisherConfig config) {
- return unAuthenticatedBuilder(config)
- .usingHttps()
- .authenticatedByHttp(config.userName().get(), config.password().get());
- }
-
- private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) {
- return new CambriaClientBuilders.PublisherBuilder()
- .usingHosts(config.destinations().mkString(","))
- .onTopic(config.topic())
- .logSendFailuresAfter(5);
- }
-}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java
deleted file mode 100644
index b7997ef9..00000000
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.google.common.cache.*;
-import io.vavr.collection.Map;
-import io.vavr.control.Option;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static io.vavr.API.Option;
-import static org.onap.dcae.common.publishing.VavrUtils.f;
-
-/**
- * @author Pawel Szalapski (pawel.szalapski@nokia.com)
- */
-class DMaaPPublishersCache {
-
- private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class);
- private final LoadingCache<String, CambriaBatchingPublisher> publishersCache;
- private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration;
-
- DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) {
- this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
- this.publishersCache = CacheBuilder.newBuilder()
- .removalListener(new OnPublisherRemovalListener())
- .build(new CambriaPublishersCacheLoader());
- }
-
- DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader,
- OnPublisherRemovalListener onPublisherRemovalListener,
- Map<String, PublisherConfig> dMaaPConfiguration) {
- this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
- this.publishersCache = CacheBuilder.newBuilder()
- .removalListener(onPublisherRemovalListener)
- .build(dMaaPPublishersCacheLoader);
- }
-
- Option<CambriaBatchingPublisher> getPublisher(String streamID) {
- try {
- return Option(publishersCache.getUnchecked(streamID));
- } catch (Exception e) {
- log.warn("Could not create / load Cambria Publisher for streamID", e);
- return Option.none();
- }
- }
-
- void closePublisherFor(String streamId) {
- publishersCache.invalidate(streamId);
- }
-
- synchronized void reconfigure(Map<String, PublisherConfig> newConfig) {
- Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get();
- Map<String, PublisherConfig> removedConfigurations = currentConfig
- .filterKeys(domain -> !newConfig.containsKey(domain));
- Map<String, PublisherConfig> changedConfigurations = newConfig
- .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e)));
- dMaaPConfiguration.set(newConfig);
- removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1));
- }
-
- static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> {
-
- @Override
- public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) {
- CambriaBatchingPublisher publisher = notification.getValue();
- if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull
- try {
- int timeout = 20;
- TimeUnit unit = TimeUnit.SECONDS;
- java.util.List<?> stuck = publisher.close(timeout, unit);
- if (!stuck.isEmpty()) {
- log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', "
- + "%s messages were dropped", stuck.size(), timeout, unit));
- }
- } catch (InterruptedException | IOException e) {
- log.error("Could not close Cambria publisher, some messages might have been dropped", e);
- Thread.currentThread().interrupt();
- }
- }
- }
- }
-
- class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> {
-
- @Override
- public CambriaBatchingPublisher load(@Nonnull String domain) {
- return dMaaPConfiguration.get()
- .get(domain)
- .toTry(() -> new RuntimeException(
- f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
- .flatMap(DMaaPPublishersBuilder::buildPublisher)
- .get();
- }
- }
-
-}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java
new file mode 100644
index 00000000..2eaeab6a
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java
@@ -0,0 +1,101 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+
+import java.time.Duration;
+
+public class DmaapRequestConfiguration {
+
+ private static final Long TIMEOUT_SECONDS = 10L;
+ private static final int RETRY_INTERVAL_IN_SECONDS = 1;
+ private static final int RETRY_COUNT = 1;
+
+ private DmaapRequestConfiguration() {
+ }
+
+ static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig, Long timeout) {
+ String topicUrl = createUrl(publisherConfig);
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(createMessageRouterSink(topicUrl))
+ .contentType(ContentType.APPLICATION_JSON)
+ .timeoutConfig(timeOutConfiguration(timeout))
+ .build();
+ }
+
+ static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig) {
+ return createPublishRequest(publisherConfig, TIMEOUT_SECONDS);
+ }
+
+ static Flux<JsonObject> jsonBatch(List<String> messages) {
+ return Flux.fromIterable(getAsJsonObjects(messages));
+ }
+
+ static MessageRouterPublisherConfig retryConfiguration() {
+ return ImmutableMessageRouterPublisherConfig.builder()
+ .retryConfig(ImmutableDmaapRetryConfig.builder()
+ .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS)
+ .retryCount(RETRY_COUNT)
+ .build())
+ .build();
+ }
+
+ private static String createUrl(Option<PublisherConfig> publisherConfig) {
+ String hostAndPort = publisherConfig.get().getHostAndPort();
+ String topicName = publisherConfig.get().topic();
+ return String.format("http://%s/events/%s/",hostAndPort,topicName);
+ }
+
+ private static List<JsonObject> getAsJsonObjects(List<String> messages) {
+ return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
+ }
+
+ static List<JsonElement> getAsJsonElements(List<String> messages) {
+ return messages.map(JsonParser::parseString);
+ }
+
+ static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) {
+ return ImmutableMessageRouterSink.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+ }
+
+ @NotNull
+ private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) {
+ return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build();
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java
new file mode 100644
index 00000000..b5c735b7
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcae.common.model.BackwardsCompatibilityException;
+import org.onap.dcae.common.model.InternalException;
+import org.onap.dcae.common.model.PayloadToLargeException;
+import org.onap.dcae.restapi.ApiException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+
+import java.util.Objects;
+
+import static org.onap.dcae.ApplicationSettings.responseCompatibility;
+
+public class MessageRouterHttpStatusMapper {
+
+ private static final Logger log = LoggerFactory.getLogger(MessageRouterHttpStatusMapper.class);
+
+ private MessageRouterHttpStatusMapper() {
+ }
+
+ @NotNull
+ static HttpStatus getHttpStatus(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return responseCompatibility.equals("v7.2") ?
+ getHttpStatusBackwardsCompatibility(messageRouterPublishResponse):
+ getHttpStatusWithMappedResponseCode(messageRouterPublishResponse);
+ }
+
+ @NotNull
+ private static HttpStatus getHttpStatusBackwardsCompatibility(MessageRouterPublishResponse messageRouterPublishResponse) {
+ if (isHttpOk(messageRouterPublishResponse)) {
+ log.info("Successfully send event to MR");
+ return HttpStatus.ACCEPTED;
+ } else {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new BackwardsCompatibilityException();
+ }
+ }
+
+ @NotNull
+ private static HttpStatus getHttpStatusWithMappedResponseCode(MessageRouterPublishResponse messageRouterPublishResponse) {
+ if (isHttpOk(messageRouterPublishResponse)) {
+ log.info("Successfully send event to MR");
+ return HttpStatus.OK;
+ } else if (isHttp413(messageRouterPublishResponse)) {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new PayloadToLargeException();
+ } else {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new InternalException(responseBody(resolveHttpCode(messageRouterPublishResponse)));
+ }
+ }
+
+ @NotNull
+ private static String resolveHttpCode(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return Objects.requireNonNull(messageRouterPublishResponse.failReason()).substring(0, 3);
+ }
+
+ @NotNull
+ private static ApiException responseBody(String substring) {
+ switch (substring) {
+ case "404":
+ return ApiException.NOT_FOUND;
+ case "408":
+ return ApiException.REQUEST_TIMEOUT;
+ case "429":
+ return ApiException.TOO_MANY_REQUESTS;
+ case "502":
+ return ApiException.BAD_GATEWAY;
+ case "503":
+ return ApiException.SERVICE_UNAVAILABLE;
+ case "504":
+ return ApiException.GATEWAY_TIMEOUT;
+ default:
+ return ApiException.INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ private static boolean isHttpOk(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return messageRouterPublishResponse.successful();
+ }
+
+ private static boolean isHttp413(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return Objects.requireNonNull(messageRouterPublishResponse.failReason()).startsWith("413");
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/Publisher.java b/src/main/java/org/onap/dcae/common/publishing/Publisher.java
new file mode 100644
index 00000000..1d688d86
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/Publisher.java
@@ -0,0 +1,64 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch;
+
+public class Publisher {
+
+ private final MessageRouterPublisher publisher;
+
+ public Publisher() {
+ this(retryConfiguration());
+ }
+
+ public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) {
+ publisher = DmaapClientFactory
+ .createMessageRouterPublisher(messageRouterPublisherConfig);
+ }
+
+ /**
+ * Publish event
+ *
+ * @param events list of ves events prepared to send
+ * @param publisherConfig publisher configuration
+ * @return flux containing information about the success or failure of the event publication
+ */
+ public Flux<MessageRouterPublishResponse> publishEvents(List<String> events, Option<PublisherConfig> publisherConfig) {
+ return publishEvents(events, createPublishRequest(publisherConfig));
+ }
+
+ Flux<MessageRouterPublishResponse> publishEvents(List<String> events, MessageRouterPublishRequest publishRequest) {
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(events);
+ return publisher.put(publishRequest, jsonMessageBatch);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
index 1fd0d316..0bb51922 100644
--- a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
+++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* org.onap.dcaegen2.collectors.ves
* ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018,2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@ public final class PublisherConfig {
this.topic = topic;
}
+
PublisherConfig(List<String> destinations, String topic, String userName, String password) {
this.destinations = destinations;
this.topic = topic;
@@ -50,6 +51,10 @@ public final class PublisherConfig {
return destinations;
}
+ String getHostAndPort(){
+ return destinations.get(0);
+ }
+
String topic() {
return topic;
}
diff --git a/src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java b/src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java
new file mode 100644
index 00000000..ea920740
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.validator;
+
+import io.vavr.control.Try;
+import org.json.JSONException;
+import org.onap.dcae.common.model.VesEvent;
+import org.onap.dcae.restapi.ApiException;
+import org.onap.dcae.restapi.EventValidatorException;
+
+import java.util.List;
+
+import static java.util.stream.Collectors.toSet;
+import static org.onap.dcae.restapi.ApiException.DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT;
+import static org.onap.dcae.restapi.ApiException.DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED;
+
+public class BatchEventValidator {
+
+ private BatchEventValidator() {
+ }
+
+ /**
+ * Check if value of domain fields are the same in every event,
+ * in case of stndDefined check stndDefinedNamespace fields
+ *
+ * @param events list of checked ves events
+ * @throws EventValidatorException when domain fields value or stndDefinedNamespace fields value are note the same
+ */
+ public static void executeBatchEventValidation(List<VesEvent> events) throws EventValidatorException {
+ if (hasNotEveryEventSameDomain(events)) {
+ throw new EventValidatorException(DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT);
+ }
+ if (isDomainStndDefined(events) && hasNotSameStndDefinedNamespace(events)) {
+ throw new EventValidatorException(DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED);
+ }
+ }
+
+ private static boolean hasNotEveryEventSameDomain(List<VesEvent> events) {
+ return events.stream()
+ .map(VesEvent::getDomain)
+ .collect(toSet())
+ .size() != 1;
+ }
+
+ private static boolean hasNotSameStndDefinedNamespace(List<VesEvent> events) {
+ return Try.of(() -> isAllStndDefinedNamespace(events))
+ .getOrElseThrow(() -> new EventValidatorException(ApiException.MISSING_NAMESPACE_PARAMETER));
+ }
+
+ private static boolean isAllStndDefinedNamespace(List<VesEvent> events) {
+ return events.stream()
+ .map(e -> e.getStndDefinedNamespace().orElse(""))
+ .collect(toSet())
+ .size() != 1;
+ }
+
+ private static boolean isDomainStndDefined(List<VesEvent> events) throws JSONException{
+ return events.stream()
+ .allMatch((e -> e.getDomain().equals("stndDefined")));
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java b/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java
index 5cd5dc25..975db791 100644
--- a/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java
+++ b/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java
@@ -25,7 +25,6 @@ import org.onap.dcae.ApplicationSettings;
import org.onap.dcae.common.model.VesEvent;
import org.onap.dcae.restapi.ApiException;
import org.onap.dcae.restapi.EventValidatorException;
-
/**
* This class is using ApplicationSetting and SchemaValidator to validate VES event.
*
diff --git a/src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java b/src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java
new file mode 100644
index 00000000..c03ab6bb
--- /dev/null
+++ b/src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.multiplestreamreducer;
+
+import io.vavr.Tuple2;
+import io.vavr.collection.Map;
+
+public class MultipleStreamReducer {
+
+ /**
+ * Converts configuration from: "one domain many streams"
+ * to: "one domain one stream"
+ *
+ * @param map domain to streams configuration
+ * @return configuration - one domain one stream
+ */
+ public Map<String, String> reduce(Map<String, String[]> map) {
+ return map.toStream()
+ .toMap(Tuple2::_1, v -> v._2[0]);
+ }
+
+ /**
+ * Information about the current match: domain to stream
+ *
+ * @param domainToStreamConfig domain to stream configuration
+ * @return current domain to stream information
+ */
+ public String getDomainToStreamsInfo(Map<String, String> domainToStreamConfig) {
+ return domainToStreamConfig.map(v -> "Domain: " +
+ v._1 + " has active stream: " + v._2 + System.lineSeparator())
+ .reduce((a, b) -> a + b);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/restapi/ApiException.java b/src/main/java/org/onap/dcae/restapi/ApiException.java
index dbd41a4d..e819cbd6 100644
--- a/src/main/java/org/onap/dcae/restapi/ApiException.java
+++ b/src/main/java/org/onap/dcae/restapi/ApiException.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* org.onap.dcaegen2.collectors.ves
* ================================================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,8 +40,18 @@ public enum ApiException {
NO_SERVER_RESOURCES(ExceptionType.SERVICE_EXCEPTION, "SVC1000", "No server resources (internal processing queue full)", 503),
STND_DEFINED_VALIDATION_FAILED(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.data invalid against event.stndDefinedFields.schemaReference", "400"), 400),
NO_LOCAL_SCHEMA_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2004", "Invalid input value for %1 %2: %3", List.of("attribute", "event.stndDefinedFields.schemaReference", "Referred external schema not present in schema repository"), 400),
- INCORRECT_INTERNAL_FILE_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.schemaReference value does not correspond to any external event schema file in externalSchema repo", "400"), 400);
-
+ INCORRECT_INTERNAL_FILE_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.schemaReference value does not correspond to any external event schema file in externalSchema repo", "400"), 400),
+ DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT(ExceptionType.SERVICE_EXCEPTION, "SVC0001", "Different value of domain fields in Batch Event", 400),
+ DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED(ExceptionType.SERVICE_EXCEPTION, "SVC0001","Value of stndDefinedNamespace fields have to be same when domain is stndDefined",400),
+ DOMAIN_NOT_DEFINED_FOR_STREAM_ID(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("No domain defined for stream id", "400"), 400),
+ PAYLOAD_TO_LARGE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Request Entity Too Large", "413"), 413),
+ NOT_FOUND(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Not Found","404"), 404),
+ REQUEST_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Request Timeout","408"), 408),
+ TOO_MANY_REQUESTS(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Too Many Requests","429"), 429),
+ INTERNAL_SERVER_ERROR(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Internal Server Error","500"), 500),
+ BAD_GATEWAY(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Bad Gateway","502"), 502),
+ SERVICE_UNAVAILABLE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Service Unavailable","503"), 503),
+ GATEWAY_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Gateway Timeout","504"), 504);
public final int httpStatusCode;
private final ExceptionType type;
diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java
index 93e428b4..6c4fb8ef 100644
--- a/src/main/java/org/onap/dcae/restapi/VesRestController.java
+++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java
@@ -3,7 +3,7 @@
* VES Collector
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,24 +21,24 @@
package org.onap.dcae.restapi;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
import org.json.JSONObject;
import org.onap.dcae.ApplicationSettings;
import org.onap.dcae.common.EventSender;
import org.onap.dcae.common.EventUpdater;
import org.onap.dcae.common.HeaderUtils;
-import org.onap.dcae.common.validator.GeneralEventValidator;
-import org.onap.dcae.common.validator.StndDefinedDataValidator;
-import org.onap.dcae.common.VESLogger;
+import org.onap.dcae.common.model.BackwardsCompatibilityException;
+import org.onap.dcae.common.model.InternalException;
+import org.onap.dcae.common.model.PayloadToLargeException;
import org.onap.dcae.common.model.StndDefinedNamespaceParameterHasEmptyValueException;
import org.onap.dcae.common.model.StndDefinedNamespaceParameterNotDefinedException;
import org.onap.dcae.common.model.VesEvent;
+import org.onap.dcae.common.validator.GeneralEventValidator;
+import org.onap.dcae.common.validator.StndDefinedDataValidator;
import org.onap.dcaegen2.services.sdk.standardization.header.CustomHeaderUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
@@ -50,8 +50,9 @@ import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.UUID;
-import static org.springframework.http.ResponseEntity.accepted;
+import static org.onap.dcae.common.validator.BatchEventValidator.executeBatchEventValidation;
import static org.springframework.http.ResponseEntity.badRequest;
+import static org.springframework.http.ResponseEntity.status;
@RestController
public class VesRestController {
@@ -113,22 +114,29 @@ public class VesRestController {
generalEventValidator.validate(vesEvent, type, version);
List<VesEvent> vesEvents = transformEvent(vesEvent, type, version, requestURI);
executeStndDefinedValidation(vesEvents);
- eventSender.send(vesEvents);
+ executeBatchEventValidation(vesEvents);
+ HttpStatus httpStatus = eventSender.send(vesEvents);
+ return status(httpStatus).contentType(MediaType.APPLICATION_JSON).body("Successfully send event");
} catch (EventValidatorException e) {
- logger.error(e.getMessage());
- return ResponseEntity.status(e.getApiException().httpStatusCode)
+ logger.error(e.getMessage());
+ return status(e.getApiException().httpStatusCode)
.body(e.getApiException().toJSON().toString());
} catch (StndDefinedNamespaceParameterNotDefinedException e) {
- return ResponseEntity.status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode)
+ return status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode)
.body(ApiException.MISSING_NAMESPACE_PARAMETER.toJSON().toString());
} catch (StndDefinedNamespaceParameterHasEmptyValueException e) {
- return ResponseEntity.status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode)
+ return status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode)
.body(ApiException.EMPTY_NAMESPACE_PARAMETER.toJSON().toString());
+ } catch (InternalException e) {
+ return status(ApiException.SERVICE_UNAVAILABLE.httpStatusCode)
+ .body(e.getApiException().toJSON().toString());
+ } catch (PayloadToLargeException e) {
+ return status(ApiException.PAYLOAD_TO_LARGE.httpStatusCode)
+ .body(ApiException.PAYLOAD_TO_LARGE.toJSON().toString());
+ } catch (BackwardsCompatibilityException e) {
+ return status(ApiException.INTERNAL_SERVER_ERROR.httpStatusCode)
+ .body(ApiException.INTERNAL_SERVER_ERROR.toJSON().toString());
}
-
- // TODO call service and return status, replace CambriaClient, split event to single object and list of them
- return accepted().headers(this.headerUtils.fillHeaders(headerUtils.getRspCustomHeader()))
- .contentType(MediaType.APPLICATION_JSON).body("Accepted");
}
private void executeStndDefinedValidation(List<VesEvent> vesEvents) {
@@ -142,7 +150,6 @@ public class VesRestController {
headerUtils.extractHeaders(request),
settings.getApiVersionDescriptionFilepath(),
headerUtils.getRestApiIdentify(request.getRequestURI()));
-
}
private List<VesEvent> transformEvent(VesEvent vesEvent, String type, String version, String requestURI) {
@@ -151,13 +158,7 @@ public class VesRestController {
private UUID generateUUID(VesEvent vesEvent, String version, String uri) {
UUID uuid = UUID.randomUUID();
- setUpECOMPLoggingForRequest(uuid);
requestLogger.info(String.format(VES_EVENT_MESSAGE, vesEvent.asJsonObject(), uuid, version, uri));
return uuid;
}
-
- private static void setUpECOMPLoggingForRequest(UUID uuid) {
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- }
-} \ No newline at end of file
+}