summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/common
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/common')
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java12
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java123
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/EventPublisher.java4
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java111
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/publishing/Publisher.java74
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java6
6 files changed, 320 insertions, 10 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
index 1209b38..e023c57 100644
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
* Copyright (C) 2018 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * Copyright (C) 2018-2022 Huawei. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,16 +36,14 @@ import static org.onap.dcae.common.publishing.VavrUtils.f;
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
-class DMaaPEventPublisher implements EventPublisher {
+public final class DMaaPEventPublisher implements EventPublisher {
private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
private final DMaaPPublishersCache publishersCache;
- private final Logger outputLogger;
+ private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");;
- DMaaPEventPublisher(DMaaPPublishersCache publishersCache,
- Logger outputLogger) {
- this.publishersCache = publishersCache;
- this.outputLogger = outputLogger;
+ public DMaaPEventPublisher(Map<String, PublisherConfig> publishersCache) {
+ this.publishersCache = new DMaaPPublishersCache(publishersCache);
}
@Override
diff --git a/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java
new file mode 100755
index 0000000..5955a9d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java
@@ -0,0 +1,123 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+
+import java.time.Duration;
+
+/**
+ * DmaapRequestConfiguration is DMaap request configuration.
+ */
+public class DmaapRequestConfiguration {
+
+ private static final Long TIMEOUT_SECONDS = 10L;
+ private static final int RETRY_INTERVAL_IN_SECONDS = 1;
+ private static final int RETRY_COUNT = 1;
+
+ private DmaapRequestConfiguration() {
+ }
+
+ /**
+ * Create publish request is DMaap request configuration.
+ * @param publisherConfig publisher configuration
+ * @param timeout timeout configuration
+ * return message reouter publish request
+ */
+ static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig, Long timeout) {
+ String topicUrl = createUrl(publisherConfig);
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(createMessageRouterSink(topicUrl))
+ .contentType(ContentType.APPLICATION_JSON)
+ .timeoutConfig(timeOutConfiguration(timeout))
+ .build();
+ }
+
+ /**
+ * Create publish request is DMaap request configuration.
+ * @param publisherConfig publisher configuration
+ * return message reouter publish request
+ */
+ static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig) {
+ return createPublishRequest(publisherConfig, TIMEOUT_SECONDS);
+ }
+
+ /**
+ * Convert JSON object list.
+ * @param messages list of messages.
+ * return flux jsonobject list of messages.
+ */
+ static Flux<JsonObject> jsonBatch(List<String> messages) {
+ return Flux.fromIterable(getAsJsonObjects(messages));
+ }
+
+ /**
+ * Retry configuration.
+ * return MessageRouterPublisherConfig message router publish coinfiguration.
+ */
+ static MessageRouterPublisherConfig retryConfiguration() {
+ return ImmutableMessageRouterPublisherConfig.builder()
+ .retryConfig(ImmutableDmaapRetryConfig.builder()
+ .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS)
+ .retryCount(RETRY_COUNT)
+ .build())
+ .build();
+ }
+
+ private static String createUrl(Option<PublisherConfig> publisherConfig) {
+ String hostAndPort = publisherConfig.get().getHostAndPort();
+ String topicName = publisherConfig.get().topic();
+ return String.format("http://%s/events/%s/",hostAndPort,topicName);
+ }
+
+ private static List<JsonObject> getAsJsonObjects(List<String> messages) {
+ return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
+ }
+
+ static List<JsonElement> getAsJsonElements(List<String> messages) {
+ return messages.map(JsonParser::parseString);
+ }
+
+ static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) {
+ return ImmutableMessageRouterSink.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+ }
+
+ @NotNull
+ private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) {
+ return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build();
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java
index 91736ec..8042215 100644
--- a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java
+++ b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.restconfcollector
* ================================================================================
* Copyright (C) 2018 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * Copyright (C) 2018-2022 Huawei. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
public interface EventPublisher {
static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) {
- return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger);
+ return new DMaaPEventPublisher(dMaaPConfig);
}
void sendEvent(JSONObject event, String domain);
diff --git a/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java
new file mode 100755
index 0000000..4806127
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java
@@ -0,0 +1,111 @@
+/*
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcae.restapi.ApiException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+
+import java.util.Objects;
+
+import static org.onap.dcae.ApplicationSettings.responseCompatibility;
+
+/**
+ * MessageRouterHttpStatusMapper is responsible for HTTP status mapper.
+ */
+public class MessageRouterHttpStatusMapper {
+
+ private static final Logger log = LoggerFactory.getLogger(MessageRouterHttpStatusMapper.class);
+
+ private MessageRouterHttpStatusMapper() {
+ }
+
+ /**
+ * Get http status.
+ * @param messageRouterPublishResponse message reouter publish response.
+ */
+ @NotNull
+ static HttpStatus getHttpStatus(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return responseCompatibility.equals("v7.2") ?
+ getHttpStatusBackwardsCompatibility(messageRouterPublishResponse):
+ getHttpStatusWithMappedResponseCode(messageRouterPublishResponse);
+ }
+
+ @NotNull
+ private static HttpStatus getHttpStatusBackwardsCompatibility(MessageRouterPublishResponse messageRouterPublishResponse) {
+ if (isHttpOk(messageRouterPublishResponse)) {
+ log.info("Successfully send event to MR");
+ return HttpStatus.ACCEPTED;
+ } else {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new RuntimeException();
+ }
+ }
+
+ @NotNull
+ private static HttpStatus getHttpStatusWithMappedResponseCode(MessageRouterPublishResponse messageRouterPublishResponse) {
+ if (isHttpOk(messageRouterPublishResponse)) {
+ log.info("Successfully send event to MR");
+ return HttpStatus.OK;
+ } else if (isHttp413(messageRouterPublishResponse)) {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new RuntimeException();
+ } else {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new RuntimeException();
+ }
+ }
+
+ @NotNull
+ private static String resolveHttpCode(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return Objects.requireNonNull(messageRouterPublishResponse.failReason()).substring(0, 3);
+ }
+
+ @NotNull
+ private static ApiException responseBody(String substring) {
+ switch (substring) {
+ case "404":
+ return ApiException.NOT_FOUND;
+ case "408":
+ return ApiException.REQUEST_TIMEOUT;
+ case "429":
+ return ApiException.TOO_MANY_REQUESTS;
+ case "502":
+ return ApiException.BAD_GATEWAY;
+ case "503":
+ return ApiException.SERVICE_UNAVAILABLE;
+ case "504":
+ return ApiException.GATEWAY_TIMEOUT;
+ default:
+ return ApiException.INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ private static boolean isHttpOk(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return messageRouterPublishResponse.successful();
+ }
+
+ private static boolean isHttp413(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return Objects.requireNonNull(messageRouterPublishResponse.failReason()).startsWith("413");
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/Publisher.java b/src/main/java/org/onap/dcae/common/publishing/Publisher.java
new file mode 100755
index 0000000..6f69f4f
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/Publisher.java
@@ -0,0 +1,74 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch;
+
+/**
+ * Publisher is responsible publish events.
+ */
+public class Publisher {
+
+ private final MessageRouterPublisher publisher;
+
+ /**
+ * Constructor
+ */
+ public Publisher() {
+ this(retryConfiguration());
+ }
+
+ /**
+ * Constructor
+ * @param messageRouterPublisherConfig message router publish configuration
+ */
+ public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) {
+ publisher = DmaapClientFactory
+ .createMessageRouterPublisher(messageRouterPublisherConfig);
+ }
+
+ /**
+ * Publish event
+ *
+ * @param events list of ves events prepared to send
+ * @param publisherConfig publisher configuration
+ * @return flux containing information about the success or failure of the event publication
+ */
+ public Flux<MessageRouterPublishResponse> publishEvents(List<String> events, Option<PublisherConfig> publisherConfig) {
+ return publishEvents(events, createPublishRequest(publisherConfig));
+ }
+
+ Flux<MessageRouterPublishResponse> publishEvents(List<String> events, MessageRouterPublishRequest publishRequest) {
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(events);
+ return publisher.put(publishRequest, jsonMessageBatch);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
index 67aca1d..57210c4 100644
--- a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
+++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.restconfcollector
* ================================================================================
* Copyright (C) 2018 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * Copyright (C) 2018-2022 Huawei. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -51,6 +51,10 @@ public final class PublisherConfig {
return destinations;
}
+ String getHostAndPort(){
+ return destinations.get(0);
+ }
+
String topic() {
return topic;
}