diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/common/publishing')
6 files changed, 320 insertions, 10 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 1209b38..e023c57 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -4,7 +4,7 @@ * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,16 +36,14 @@ import static org.onap.dcae.common.publishing.VavrUtils.f; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ -class DMaaPEventPublisher implements EventPublisher { +public final class DMaaPEventPublisher implements EventPublisher { private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); private final DMaaPPublishersCache publishersCache; - private final Logger outputLogger; + private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");; - DMaaPEventPublisher(DMaaPPublishersCache publishersCache, - Logger outputLogger) { - this.publishersCache = publishersCache; - this.outputLogger = outputLogger; + public DMaaPEventPublisher(Map<String, PublisherConfig> publishersCache) { + this.publishersCache = new DMaaPPublishersCache(publishersCache); } @Override diff --git a/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java new file mode 100755 index 0000000..5955a9d --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java @@ -0,0 +1,123 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import java.time.Duration; + +/** + * DmaapRequestConfiguration is DMaap request configuration. + */ +public class DmaapRequestConfiguration { + + private static final Long TIMEOUT_SECONDS = 10L; + private static final int RETRY_INTERVAL_IN_SECONDS = 1; + private static final int RETRY_COUNT = 1; + + private DmaapRequestConfiguration() { + } + + /** + * Create publish request is DMaap request configuration. + * @param publisherConfig publisher configuration + * @param timeout timeout configuration + * return message reouter publish request + */ + static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig, Long timeout) { + String topicUrl = createUrl(publisherConfig); + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(createMessageRouterSink(topicUrl)) + .contentType(ContentType.APPLICATION_JSON) + .timeoutConfig(timeOutConfiguration(timeout)) + .build(); + } + + /** + * Create publish request is DMaap request configuration. + * @param publisherConfig publisher configuration + * return message reouter publish request + */ + static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig) { + return createPublishRequest(publisherConfig, TIMEOUT_SECONDS); + } + + /** + * Convert JSON object list. + * @param messages list of messages. + * return flux jsonobject list of messages. + */ + static Flux<JsonObject> jsonBatch(List<String> messages) { + return Flux.fromIterable(getAsJsonObjects(messages)); + } + + /** + * Retry configuration. + * return MessageRouterPublisherConfig message router publish coinfiguration. + */ + static MessageRouterPublisherConfig retryConfiguration() { + return ImmutableMessageRouterPublisherConfig.builder() + .retryConfig(ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS) + .retryCount(RETRY_COUNT) + .build()) + .build(); + } + + private static String createUrl(Option<PublisherConfig> publisherConfig) { + String hostAndPort = publisherConfig.get().getHostAndPort(); + String topicName = publisherConfig.get().topic(); + return String.format("http://%s/events/%s/",hostAndPort,topicName); + } + + private static List<JsonObject> getAsJsonObjects(List<String> messages) { + return getAsJsonElements(messages).map(JsonElement::getAsJsonObject); + } + + static List<JsonElement> getAsJsonElements(List<String> messages) { + return messages.map(JsonParser::parseString); + } + + static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) { + return ImmutableMessageRouterSink.builder() + .name("the topic") + .topicUrl(topicUrl) + .build(); + } + + @NotNull + private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) { + return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build(); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java index 91736ec..8042215 100644 --- a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.restconfcollector * ================================================================================ * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ import org.slf4j.Logger; public interface EventPublisher { static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) { - return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger); + return new DMaaPEventPublisher(dMaaPConfig); } void sendEvent(JSONObject event, String domain); diff --git a/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java new file mode 100755 index 0000000..4806127 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java @@ -0,0 +1,111 @@ +/* + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcae.restapi.ApiException; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; + +import java.util.Objects; + +import static org.onap.dcae.ApplicationSettings.responseCompatibility; + +/** + * MessageRouterHttpStatusMapper is responsible for HTTP status mapper. + */ +public class MessageRouterHttpStatusMapper { + + private static final Logger log = LoggerFactory.getLogger(MessageRouterHttpStatusMapper.class); + + private MessageRouterHttpStatusMapper() { + } + + /** + * Get http status. + * @param messageRouterPublishResponse message reouter publish response. + */ + @NotNull + static HttpStatus getHttpStatus(MessageRouterPublishResponse messageRouterPublishResponse) { + return responseCompatibility.equals("v7.2") ? + getHttpStatusBackwardsCompatibility(messageRouterPublishResponse): + getHttpStatusWithMappedResponseCode(messageRouterPublishResponse); + } + + @NotNull + private static HttpStatus getHttpStatusBackwardsCompatibility(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.ACCEPTED; + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new RuntimeException(); + } + } + + @NotNull + private static HttpStatus getHttpStatusWithMappedResponseCode(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.OK; + } else if (isHttp413(messageRouterPublishResponse)) { + log.error(messageRouterPublishResponse.failReason()); + throw new RuntimeException(); + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new RuntimeException(); + } + } + + @NotNull + private static String resolveHttpCode(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).substring(0, 3); + } + + @NotNull + private static ApiException responseBody(String substring) { + switch (substring) { + case "404": + return ApiException.NOT_FOUND; + case "408": + return ApiException.REQUEST_TIMEOUT; + case "429": + return ApiException.TOO_MANY_REQUESTS; + case "502": + return ApiException.BAD_GATEWAY; + case "503": + return ApiException.SERVICE_UNAVAILABLE; + case "504": + return ApiException.GATEWAY_TIMEOUT; + default: + return ApiException.INTERNAL_SERVER_ERROR; + } + } + + private static boolean isHttpOk(MessageRouterPublishResponse messageRouterPublishResponse) { + return messageRouterPublishResponse.successful(); + } + + private static boolean isHttp413(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).startsWith("413"); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/Publisher.java b/src/main/java/org/onap/dcae/common/publishing/Publisher.java new file mode 100755 index 0000000..6f69f4f --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/Publisher.java @@ -0,0 +1,74 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonObject; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch; + +/** + * Publisher is responsible publish events. + */ +public class Publisher { + + private final MessageRouterPublisher publisher; + + /** + * Constructor + */ + public Publisher() { + this(retryConfiguration()); + } + + /** + * Constructor + * @param messageRouterPublisherConfig message router publish configuration + */ + public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) { + publisher = DmaapClientFactory + .createMessageRouterPublisher(messageRouterPublisherConfig); + } + + /** + * Publish event + * + * @param events list of ves events prepared to send + * @param publisherConfig publisher configuration + * @return flux containing information about the success or failure of the event publication + */ + public Flux<MessageRouterPublishResponse> publishEvents(List<String> events, Option<PublisherConfig> publisherConfig) { + return publishEvents(events, createPublishRequest(publisherConfig)); + } + + Flux<MessageRouterPublishResponse> publishEvents(List<String> events, MessageRouterPublishRequest publishRequest) { + final Flux<JsonObject> jsonMessageBatch = jsonBatch(events); + return publisher.put(publishRequest, jsonMessageBatch); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java index 67aca1d..57210c4 100644 --- a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java +++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.restconfcollector * ================================================================================ * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +51,10 @@ public final class PublisherConfig { return destinations; } + String getHostAndPort(){ + return destinations.get(0); + } + String topic() { return topic; } |