aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/common/publishing
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/common/publishing')
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java20
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java84
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java61
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java121
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java101
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java107
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/Publisher.java64
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java7
8 files changed, 308 insertions, 257 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java
index 274e4490..9f8ffcc6 100644
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java
@@ -63,28 +63,10 @@ public final class DMaaPConfigurationParser {
}
private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
- return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
+ return Try(() -> parseNewFormat(config))
.mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
}
- private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
- return dMaaPConfig.has("channels");
- }
-
- private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
- return root.get("channels").toList().toMap(
- channel -> channel.get("name").toString(),
- channel -> {
- String destinationsStr = channel.getAsOption("cambria.url")
- .getOrElse(channel.getAsOption("cambria.hosts").get())
- .toString();
- String topic = channel.get("cambria.topic").toString();
- Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
- Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
- List<String> destinations = List(destinationsStr.split(","));
- return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
- });
- }
private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
return root.keys().toMap(
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
index 2b4cfc15..08e16e0c 100644
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.collectors.ves
* ================================================================================
* Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
+ * Copyright (C) 2018-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,36 +21,29 @@
package org.onap.dcae.common.publishing;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
import io.vavr.collection.Map;
-import io.vavr.control.Try;
-import org.onap.dcae.common.VESLogger;
import org.onap.dcae.common.model.VesEvent;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import reactor.core.publisher.Flux;
-import java.io.IOException;
-
-import static org.onap.dcae.common.publishing.VavrUtils.f;
+import java.util.List;
+import java.util.Objects;
+import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus;
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
public class DMaaPEventPublisher {
- private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
- private DMaaPPublishersCache publishersCache;
- private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");
-
- DMaaPEventPublisher(DMaaPPublishersCache publishersCache) {
- this.publishersCache = publishersCache;
- }
+ private Map<String, PublisherConfig> dMaaPConfig;
+ private final Publisher dmaapPublisher;
public DMaaPEventPublisher(Map<String, PublisherConfig> dMaaPConfig) {
- this(new DMaaPPublishersCache(dMaaPConfig));
+ this.dMaaPConfig = dMaaPConfig;
+ dmaapPublisher = new Publisher();
}
/**
@@ -58,48 +51,29 @@ public class DMaaPEventPublisher {
* @param dmaapConfiguration Dmaap configuration
*/
public void reload(Map<String, PublisherConfig> dmaapConfiguration){
- this.publishersCache = new DMaaPPublishersCache(dmaapConfiguration);
- }
-
- public void sendEvent(VesEvent vesEvent, String dmaapId){
- clearVesUniqueIdFromEvent(vesEvent);
- publishersCache.getPublisher(dmaapId)
- .onEmpty(() ->
- log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent)))
- .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher));
- }
-
- private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) {
- Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher))
- .onFailure(exc -> closePublisher(event, dmaapId, exc));
+ dMaaPConfig = dmaapConfiguration;
+ log.info("reload dmaap configuration");
}
- private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher)
- throws IOException {
-
- String pk = event.getPK();
- int pendingMsgs = publisher.send(pk, event.asJsonObject().toString());
- if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
- log.info("Pending messages count: " + pendingMsgs);
- }
- String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId);
- log.info(infoMsg);
- outputLogger.info(infoMsg);
+ public HttpStatus sendEvent(List<VesEvent> vesEvents, String dmaapId) {
+ clearVesUniqueIdFromEvent(vesEvents);
+ io.vavr.collection.List<String> events = mapListOfEventsToVavrList(vesEvents);
+ Flux<MessageRouterPublishResponse> messageRouterPublishFlux = dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId));
+ MessageRouterPublishResponse messageRouterPublishResponse = messageRouterPublishFlux.blockFirst();
+ return getHttpStatus(Objects.requireNonNull(messageRouterPublishResponse));
}
- private void closePublisher(VesEvent event, String dmaapId, Throwable e) {
- log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
- event, dmaapId), e);
- publishersCache.closePublisherFor(dmaapId);
+ private io.vavr.collection.List<String> mapListOfEventsToVavrList(List<VesEvent> vesEvents) {
+ return io.vavr.collection.List.ofAll(vesEvents)
+ .map(event -> event.asJsonObject().toString());
}
- private void clearVesUniqueIdFromEvent(VesEvent event) {
- if (event.hasType(VesEvent.VES_UNIQUE_ID)) {
- String uuid = event.getUniqueId().toString();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- log.debug("Removing VESuniqueid object from event");
- event.removeElement(VesEvent.VES_UNIQUE_ID);
- }
+ private void clearVesUniqueIdFromEvent(List<VesEvent> events) {
+ events.stream()
+ .filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID))
+ .forEach(event -> {
+ log.debug("Removing VESuniqueid object from event");
+ event.removeElement(VesEvent.VES_UNIQUE_ID);
+ });
}
}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java
deleted file mode 100644
index a93073bf..00000000
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import io.vavr.control.Try;
-
-import static io.vavr.API.Try;
-import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
-import static org.onap.dcae.common.publishing.VavrUtils.f;
-
-/**
- * @author Pawel Szalapski (pawel.szalapski@nokia.com)
- */
-final class DMaaPPublishersBuilder {
-
- static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) {
- return Try(() -> builder(config).build())
- .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
- }
-
- private static PublisherBuilder builder(PublisherConfig config) {
- if (config.isSecured()) {
- return authenticatedBuilder(config);
- } else {
- return unAuthenticatedBuilder(config);
- }
- }
-
- private static PublisherBuilder authenticatedBuilder(PublisherConfig config) {
- return unAuthenticatedBuilder(config)
- .usingHttps()
- .authenticatedByHttp(config.userName().get(), config.password().get());
- }
-
- private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) {
- return new CambriaClientBuilders.PublisherBuilder()
- .usingHosts(config.destinations().mkString(","))
- .onTopic(config.topic())
- .logSendFailuresAfter(5);
- }
-}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java
deleted file mode 100644
index b7997ef9..00000000
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.google.common.cache.*;
-import io.vavr.collection.Map;
-import io.vavr.control.Option;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static io.vavr.API.Option;
-import static org.onap.dcae.common.publishing.VavrUtils.f;
-
-/**
- * @author Pawel Szalapski (pawel.szalapski@nokia.com)
- */
-class DMaaPPublishersCache {
-
- private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class);
- private final LoadingCache<String, CambriaBatchingPublisher> publishersCache;
- private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration;
-
- DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) {
- this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
- this.publishersCache = CacheBuilder.newBuilder()
- .removalListener(new OnPublisherRemovalListener())
- .build(new CambriaPublishersCacheLoader());
- }
-
- DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader,
- OnPublisherRemovalListener onPublisherRemovalListener,
- Map<String, PublisherConfig> dMaaPConfiguration) {
- this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
- this.publishersCache = CacheBuilder.newBuilder()
- .removalListener(onPublisherRemovalListener)
- .build(dMaaPPublishersCacheLoader);
- }
-
- Option<CambriaBatchingPublisher> getPublisher(String streamID) {
- try {
- return Option(publishersCache.getUnchecked(streamID));
- } catch (Exception e) {
- log.warn("Could not create / load Cambria Publisher for streamID", e);
- return Option.none();
- }
- }
-
- void closePublisherFor(String streamId) {
- publishersCache.invalidate(streamId);
- }
-
- synchronized void reconfigure(Map<String, PublisherConfig> newConfig) {
- Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get();
- Map<String, PublisherConfig> removedConfigurations = currentConfig
- .filterKeys(domain -> !newConfig.containsKey(domain));
- Map<String, PublisherConfig> changedConfigurations = newConfig
- .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e)));
- dMaaPConfiguration.set(newConfig);
- removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1));
- }
-
- static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> {
-
- @Override
- public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) {
- CambriaBatchingPublisher publisher = notification.getValue();
- if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull
- try {
- int timeout = 20;
- TimeUnit unit = TimeUnit.SECONDS;
- java.util.List<?> stuck = publisher.close(timeout, unit);
- if (!stuck.isEmpty()) {
- log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', "
- + "%s messages were dropped", stuck.size(), timeout, unit));
- }
- } catch (InterruptedException | IOException e) {
- log.error("Could not close Cambria publisher, some messages might have been dropped", e);
- Thread.currentThread().interrupt();
- }
- }
- }
- }
-
- class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> {
-
- @Override
- public CambriaBatchingPublisher load(@Nonnull String domain) {
- return dMaaPConfiguration.get()
- .get(domain)
- .toTry(() -> new RuntimeException(
- f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
- .flatMap(DMaaPPublishersBuilder::buildPublisher)
- .get();
- }
- }
-
-}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java
new file mode 100644
index 00000000..2eaeab6a
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java
@@ -0,0 +1,101 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+
+import java.time.Duration;
+
+public class DmaapRequestConfiguration {
+
+ private static final Long TIMEOUT_SECONDS = 10L;
+ private static final int RETRY_INTERVAL_IN_SECONDS = 1;
+ private static final int RETRY_COUNT = 1;
+
+ private DmaapRequestConfiguration() {
+ }
+
+ static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig, Long timeout) {
+ String topicUrl = createUrl(publisherConfig);
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(createMessageRouterSink(topicUrl))
+ .contentType(ContentType.APPLICATION_JSON)
+ .timeoutConfig(timeOutConfiguration(timeout))
+ .build();
+ }
+
+ static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig) {
+ return createPublishRequest(publisherConfig, TIMEOUT_SECONDS);
+ }
+
+ static Flux<JsonObject> jsonBatch(List<String> messages) {
+ return Flux.fromIterable(getAsJsonObjects(messages));
+ }
+
+ static MessageRouterPublisherConfig retryConfiguration() {
+ return ImmutableMessageRouterPublisherConfig.builder()
+ .retryConfig(ImmutableDmaapRetryConfig.builder()
+ .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS)
+ .retryCount(RETRY_COUNT)
+ .build())
+ .build();
+ }
+
+ private static String createUrl(Option<PublisherConfig> publisherConfig) {
+ String hostAndPort = publisherConfig.get().getHostAndPort();
+ String topicName = publisherConfig.get().topic();
+ return String.format("http://%s/events/%s/",hostAndPort,topicName);
+ }
+
+ private static List<JsonObject> getAsJsonObjects(List<String> messages) {
+ return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
+ }
+
+ static List<JsonElement> getAsJsonElements(List<String> messages) {
+ return messages.map(JsonParser::parseString);
+ }
+
+ static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) {
+ return ImmutableMessageRouterSink.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+ }
+
+ @NotNull
+ private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) {
+ return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build();
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java
new file mode 100644
index 00000000..b5c735b7
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcae.common.model.BackwardsCompatibilityException;
+import org.onap.dcae.common.model.InternalException;
+import org.onap.dcae.common.model.PayloadToLargeException;
+import org.onap.dcae.restapi.ApiException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+
+import java.util.Objects;
+
+import static org.onap.dcae.ApplicationSettings.responseCompatibility;
+
+public class MessageRouterHttpStatusMapper {
+
+ private static final Logger log = LoggerFactory.getLogger(MessageRouterHttpStatusMapper.class);
+
+ private MessageRouterHttpStatusMapper() {
+ }
+
+ @NotNull
+ static HttpStatus getHttpStatus(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return responseCompatibility.equals("v7.2") ?
+ getHttpStatusBackwardsCompatibility(messageRouterPublishResponse):
+ getHttpStatusWithMappedResponseCode(messageRouterPublishResponse);
+ }
+
+ @NotNull
+ private static HttpStatus getHttpStatusBackwardsCompatibility(MessageRouterPublishResponse messageRouterPublishResponse) {
+ if (isHttpOk(messageRouterPublishResponse)) {
+ log.info("Successfully send event to MR");
+ return HttpStatus.ACCEPTED;
+ } else {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new BackwardsCompatibilityException();
+ }
+ }
+
+ @NotNull
+ private static HttpStatus getHttpStatusWithMappedResponseCode(MessageRouterPublishResponse messageRouterPublishResponse) {
+ if (isHttpOk(messageRouterPublishResponse)) {
+ log.info("Successfully send event to MR");
+ return HttpStatus.OK;
+ } else if (isHttp413(messageRouterPublishResponse)) {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new PayloadToLargeException();
+ } else {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new InternalException(responseBody(resolveHttpCode(messageRouterPublishResponse)));
+ }
+ }
+
+ @NotNull
+ private static String resolveHttpCode(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return Objects.requireNonNull(messageRouterPublishResponse.failReason()).substring(0, 3);
+ }
+
+ @NotNull
+ private static ApiException responseBody(String substring) {
+ switch (substring) {
+ case "404":
+ return ApiException.NOT_FOUND;
+ case "408":
+ return ApiException.REQUEST_TIMEOUT;
+ case "429":
+ return ApiException.TOO_MANY_REQUESTS;
+ case "502":
+ return ApiException.BAD_GATEWAY;
+ case "503":
+ return ApiException.SERVICE_UNAVAILABLE;
+ case "504":
+ return ApiException.GATEWAY_TIMEOUT;
+ default:
+ return ApiException.INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ private static boolean isHttpOk(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return messageRouterPublishResponse.successful();
+ }
+
+ private static boolean isHttp413(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return Objects.requireNonNull(messageRouterPublishResponse.failReason()).startsWith("413");
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/Publisher.java b/src/main/java/org/onap/dcae/common/publishing/Publisher.java
new file mode 100644
index 00000000..1d688d86
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/Publisher.java
@@ -0,0 +1,64 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch;
+
+public class Publisher {
+
+ private final MessageRouterPublisher publisher;
+
+ public Publisher() {
+ this(retryConfiguration());
+ }
+
+ public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) {
+ publisher = DmaapClientFactory
+ .createMessageRouterPublisher(messageRouterPublisherConfig);
+ }
+
+ /**
+ * Publish event
+ *
+ * @param events list of ves events prepared to send
+ * @param publisherConfig publisher configuration
+ * @return flux containing information about the success or failure of the event publication
+ */
+ public Flux<MessageRouterPublishResponse> publishEvents(List<String> events, Option<PublisherConfig> publisherConfig) {
+ return publishEvents(events, createPublishRequest(publisherConfig));
+ }
+
+ Flux<MessageRouterPublishResponse> publishEvents(List<String> events, MessageRouterPublishRequest publishRequest) {
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(events);
+ return publisher.put(publishRequest, jsonMessageBatch);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
index 1fd0d316..0bb51922 100644
--- a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
+++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* org.onap.dcaegen2.collectors.ves
* ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018,2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@ public final class PublisherConfig {
this.topic = topic;
}
+
PublisherConfig(List<String> destinations, String topic, String userName, String password) {
this.destinations = destinations;
this.topic = topic;
@@ -50,6 +51,10 @@ public final class PublisherConfig {
return destinations;
}
+ String getHostAndPort(){
+ return destinations.get(0);
+ }
+
String topic() {
return topic;
}