diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/common/publishing')
8 files changed, 308 insertions, 257 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java index 274e4490..9f8ffcc6 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java @@ -63,28 +63,10 @@ public final class DMaaPConfigurationParser { } private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) { - return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config)) + return Try(() -> parseNewFormat(config)) .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); } - private static boolean usesLegacyFormat(AnyNode dMaaPConfig) { - return dMaaPConfig.has("channels"); - } - - private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) { - return root.get("channels").toList().toMap( - channel -> channel.get("name").toString(), - channel -> { - String destinationsStr = channel.getAsOption("cambria.url") - .getOrElse(channel.getAsOption("cambria.hosts").get()) - .toString(); - String topic = channel.get("cambria.topic").toString(); - Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); - Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); - List<String> destinations = List(destinationsStr.split(",")); - return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); - }); - } private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) { return root.keys().toMap( diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 2b4cfc15..08e16e0c 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.collectors.ves * ================================================================================ * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018,2020 Nokia. All rights reserved. + * Copyright (C) 2018-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,36 +21,29 @@ package org.onap.dcae.common.publishing; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; -import io.vavr.control.Try; -import org.onap.dcae.common.VESLogger; import org.onap.dcae.common.model.VesEvent; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import reactor.core.publisher.Flux; -import java.io.IOException; - -import static org.onap.dcae.common.publishing.VavrUtils.f; +import java.util.List; +import java.util.Objects; +import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ public class DMaaPEventPublisher { - private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); - private DMaaPPublishersCache publishersCache; - private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output"); - - DMaaPEventPublisher(DMaaPPublishersCache publishersCache) { - this.publishersCache = publishersCache; - } + private Map<String, PublisherConfig> dMaaPConfig; + private final Publisher dmaapPublisher; public DMaaPEventPublisher(Map<String, PublisherConfig> dMaaPConfig) { - this(new DMaaPPublishersCache(dMaaPConfig)); + this.dMaaPConfig = dMaaPConfig; + dmaapPublisher = new Publisher(); } /** @@ -58,48 +51,29 @@ public class DMaaPEventPublisher { * @param dmaapConfiguration Dmaap configuration */ public void reload(Map<String, PublisherConfig> dmaapConfiguration){ - this.publishersCache = new DMaaPPublishersCache(dmaapConfiguration); - } - - public void sendEvent(VesEvent vesEvent, String dmaapId){ - clearVesUniqueIdFromEvent(vesEvent); - publishersCache.getPublisher(dmaapId) - .onEmpty(() -> - log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent))) - .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher)); - } - - private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) { - Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher)) - .onFailure(exc -> closePublisher(event, dmaapId, exc)); + dMaaPConfig = dmaapConfiguration; + log.info("reload dmaap configuration"); } - private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) - throws IOException { - - String pk = event.getPK(); - int pendingMsgs = publisher.send(pk, event.asJsonObject().toString()); - if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { - log.info("Pending messages count: " + pendingMsgs); - } - String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId); - log.info(infoMsg); - outputLogger.info(infoMsg); + public HttpStatus sendEvent(List<VesEvent> vesEvents, String dmaapId) { + clearVesUniqueIdFromEvent(vesEvents); + io.vavr.collection.List<String> events = mapListOfEventsToVavrList(vesEvents); + Flux<MessageRouterPublishResponse> messageRouterPublishFlux = dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId)); + MessageRouterPublishResponse messageRouterPublishResponse = messageRouterPublishFlux.blockFirst(); + return getHttpStatus(Objects.requireNonNull(messageRouterPublishResponse)); } - private void closePublisher(VesEvent event, String dmaapId, Throwable e) { - log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", - event, dmaapId), e); - publishersCache.closePublisherFor(dmaapId); + private io.vavr.collection.List<String> mapListOfEventsToVavrList(List<VesEvent> vesEvents) { + return io.vavr.collection.List.ofAll(vesEvents) + .map(event -> event.asJsonObject().toString()); } - private void clearVesUniqueIdFromEvent(VesEvent event) { - if (event.hasType(VesEvent.VES_UNIQUE_ID)) { - String uuid = event.getUniqueId().toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("Removing VESuniqueid object from event"); - event.removeElement(VesEvent.VES_UNIQUE_ID); - } + private void clearVesUniqueIdFromEvent(List<VesEvent> events) { + events.stream() + .filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID)) + .forEach(event -> { + log.debug("Removing VESuniqueid object from event"); + event.removeElement(VesEvent.VES_UNIQUE_ID); + }); } } diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java deleted file mode 100644 index a93073bf..00000000 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java +++ /dev/null @@ -1,61 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.common.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import io.vavr.control.Try; - -import static io.vavr.API.Try; -import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; -import static org.onap.dcae.common.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -final class DMaaPPublishersBuilder { - - static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) { - return Try(() -> builder(config).build()) - .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); - } - - private static PublisherBuilder builder(PublisherConfig config) { - if (config.isSecured()) { - return authenticatedBuilder(config); - } else { - return unAuthenticatedBuilder(config); - } - } - - private static PublisherBuilder authenticatedBuilder(PublisherConfig config) { - return unAuthenticatedBuilder(config) - .usingHttps() - .authenticatedByHttp(config.userName().get(), config.password().get()); - } - - private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) { - return new CambriaClientBuilders.PublisherBuilder() - .usingHosts(config.destinations().mkString(",")) - .onTopic(config.topic()) - .logSendFailuresAfter(5); - } -} diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java deleted file mode 100644 index b7997ef9..00000000 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java +++ /dev/null @@ -1,121 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.common.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.common.cache.*; -import io.vavr.collection.Map; -import io.vavr.control.Option; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static io.vavr.API.Option; -import static org.onap.dcae.common.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -class DMaaPPublishersCache { - - private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class); - private final LoadingCache<String, CambriaBatchingPublisher> publishersCache; - private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration; - - DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(new OnPublisherRemovalListener()) - .build(new CambriaPublishersCacheLoader()); - } - - DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, - OnPublisherRemovalListener onPublisherRemovalListener, - Map<String, PublisherConfig> dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(onPublisherRemovalListener) - .build(dMaaPPublishersCacheLoader); - } - - Option<CambriaBatchingPublisher> getPublisher(String streamID) { - try { - return Option(publishersCache.getUnchecked(streamID)); - } catch (Exception e) { - log.warn("Could not create / load Cambria Publisher for streamID", e); - return Option.none(); - } - } - - void closePublisherFor(String streamId) { - publishersCache.invalidate(streamId); - } - - synchronized void reconfigure(Map<String, PublisherConfig> newConfig) { - Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get(); - Map<String, PublisherConfig> removedConfigurations = currentConfig - .filterKeys(domain -> !newConfig.containsKey(domain)); - Map<String, PublisherConfig> changedConfigurations = newConfig - .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); - dMaaPConfiguration.set(newConfig); - removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); - } - - static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> { - - @Override - public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) { - CambriaBatchingPublisher publisher = notification.getValue(); - if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull - try { - int timeout = 20; - TimeUnit unit = TimeUnit.SECONDS; - java.util.List<?> stuck = publisher.close(timeout, unit); - if (!stuck.isEmpty()) { - log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " - + "%s messages were dropped", stuck.size(), timeout, unit)); - } - } catch (InterruptedException | IOException e) { - log.error("Could not close Cambria publisher, some messages might have been dropped", e); - Thread.currentThread().interrupt(); - } - } - } - } - - class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> { - - @Override - public CambriaBatchingPublisher load(@Nonnull String domain) { - return dMaaPConfiguration.get() - .get(domain) - .toTry(() -> new RuntimeException( - f("DMaaP configuration contains no configuration for domain: '%s'", domain))) - .flatMap(DMaaPPublishersBuilder::buildPublisher) - .get(); - } - } - -} diff --git a/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java new file mode 100644 index 00000000..2eaeab6a --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java @@ -0,0 +1,101 @@ +/*- + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import java.time.Duration; + +public class DmaapRequestConfiguration { + + private static final Long TIMEOUT_SECONDS = 10L; + private static final int RETRY_INTERVAL_IN_SECONDS = 1; + private static final int RETRY_COUNT = 1; + + private DmaapRequestConfiguration() { + } + + static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig, Long timeout) { + String topicUrl = createUrl(publisherConfig); + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(createMessageRouterSink(topicUrl)) + .contentType(ContentType.APPLICATION_JSON) + .timeoutConfig(timeOutConfiguration(timeout)) + .build(); + } + + static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig) { + return createPublishRequest(publisherConfig, TIMEOUT_SECONDS); + } + + static Flux<JsonObject> jsonBatch(List<String> messages) { + return Flux.fromIterable(getAsJsonObjects(messages)); + } + + static MessageRouterPublisherConfig retryConfiguration() { + return ImmutableMessageRouterPublisherConfig.builder() + .retryConfig(ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS) + .retryCount(RETRY_COUNT) + .build()) + .build(); + } + + private static String createUrl(Option<PublisherConfig> publisherConfig) { + String hostAndPort = publisherConfig.get().getHostAndPort(); + String topicName = publisherConfig.get().topic(); + return String.format("http://%s/events/%s/",hostAndPort,topicName); + } + + private static List<JsonObject> getAsJsonObjects(List<String> messages) { + return getAsJsonElements(messages).map(JsonElement::getAsJsonObject); + } + + static List<JsonElement> getAsJsonElements(List<String> messages) { + return messages.map(JsonParser::parseString); + } + + static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) { + return ImmutableMessageRouterSink.builder() + .name("the topic") + .topicUrl(topicUrl) + .build(); + } + + @NotNull + private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) { + return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build(); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java new file mode 100644 index 00000000..b5c735b7 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java @@ -0,0 +1,107 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcae.common.model.BackwardsCompatibilityException; +import org.onap.dcae.common.model.InternalException; +import org.onap.dcae.common.model.PayloadToLargeException; +import org.onap.dcae.restapi.ApiException; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; + +import java.util.Objects; + +import static org.onap.dcae.ApplicationSettings.responseCompatibility; + +public class MessageRouterHttpStatusMapper { + + private static final Logger log = LoggerFactory.getLogger(MessageRouterHttpStatusMapper.class); + + private MessageRouterHttpStatusMapper() { + } + + @NotNull + static HttpStatus getHttpStatus(MessageRouterPublishResponse messageRouterPublishResponse) { + return responseCompatibility.equals("v7.2") ? + getHttpStatusBackwardsCompatibility(messageRouterPublishResponse): + getHttpStatusWithMappedResponseCode(messageRouterPublishResponse); + } + + @NotNull + private static HttpStatus getHttpStatusBackwardsCompatibility(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.ACCEPTED; + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new BackwardsCompatibilityException(); + } + } + + @NotNull + private static HttpStatus getHttpStatusWithMappedResponseCode(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.OK; + } else if (isHttp413(messageRouterPublishResponse)) { + log.error(messageRouterPublishResponse.failReason()); + throw new PayloadToLargeException(); + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new InternalException(responseBody(resolveHttpCode(messageRouterPublishResponse))); + } + } + + @NotNull + private static String resolveHttpCode(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).substring(0, 3); + } + + @NotNull + private static ApiException responseBody(String substring) { + switch (substring) { + case "404": + return ApiException.NOT_FOUND; + case "408": + return ApiException.REQUEST_TIMEOUT; + case "429": + return ApiException.TOO_MANY_REQUESTS; + case "502": + return ApiException.BAD_GATEWAY; + case "503": + return ApiException.SERVICE_UNAVAILABLE; + case "504": + return ApiException.GATEWAY_TIMEOUT; + default: + return ApiException.INTERNAL_SERVER_ERROR; + } + } + + private static boolean isHttpOk(MessageRouterPublishResponse messageRouterPublishResponse) { + return messageRouterPublishResponse.successful(); + } + + private static boolean isHttp413(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).startsWith("413"); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/Publisher.java b/src/main/java/org/onap/dcae/common/publishing/Publisher.java new file mode 100644 index 00000000..1d688d86 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/Publisher.java @@ -0,0 +1,64 @@ +/*- + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonObject; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch; + +public class Publisher { + + private final MessageRouterPublisher publisher; + + public Publisher() { + this(retryConfiguration()); + } + + public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) { + publisher = DmaapClientFactory + .createMessageRouterPublisher(messageRouterPublisherConfig); + } + + /** + * Publish event + * + * @param events list of ves events prepared to send + * @param publisherConfig publisher configuration + * @return flux containing information about the success or failure of the event publication + */ + public Flux<MessageRouterPublishResponse> publishEvents(List<String> events, Option<PublisherConfig> publisherConfig) { + return publishEvents(events, createPublishRequest(publisherConfig)); + } + + Flux<MessageRouterPublishResponse> publishEvents(List<String> events, MessageRouterPublishRequest publishRequest) { + final Flux<JsonObject> jsonMessageBatch = jsonBatch(events); + return publisher.put(publishRequest, jsonMessageBatch); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java index 1fd0d316..0bb51922 100644 --- a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java +++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018,2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ public final class PublisherConfig { this.topic = topic; } + PublisherConfig(List<String> destinations, String topic, String userName, String password) { this.destinations = destinations; this.topic = topic; @@ -50,6 +51,10 @@ public final class PublisherConfig { return destinations; } + String getHostAndPort(){ + return destinations.get(0); + } + String topic() { return topic; } |