diff options
author | mpriyank <priyank.maheshwari@est.tech> | 2024-01-23 15:48:07 +0000 |
---|---|---|
committer | mpriyank <priyank.maheshwari@est.tech> | 2024-01-25 16:02:25 +0000 |
commit | 33f7b3ef7b4f32ed33abd0890c71b48de2a17625 (patch) | |
tree | 853793452dd986024ac45dc428ce590b924e2c7d /cps-ncmp-service/src/main/java/org | |
parent | 52a7c2b2685ad63154e347782f0ff5c16f87ef53 (diff) |
CloudEvents support for cps-core
- Moving cloud events dependencies to the cps-service module instead of
cps-ncmp-service
- Testware has moved and new tests also introduced
- EventsPublisher also moved
Issue-ID: CPS-2040
Change-Id: Ibafb15e7e9efbdbe1e00e2b4f0da820bbcead004
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
Diffstat (limited to 'cps-ncmp-service/src/main/java/org')
7 files changed, 12 insertions, 148 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/AsyncRestRequestResponseEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/AsyncRestRequestResponseEventConsumer.java index 0044182ddd..993e3d63db 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/AsyncRestRequestResponseEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/AsyncRestRequestResponseEventConsumer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (c) 2023 Nordix Foundation. + * Copyright (c) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ package org.onap.cps.ncmp.api.impl.async; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.api.impl.events.EventsPublisher; +import org.onap.cps.events.EventsPublisher; import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationEventConsumer.java index d62a09a659..9bb7fae4d9 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationEventConsumer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation + * Copyright (C) 2023-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import io.cloudevents.kafka.impl.KafkaHeaders; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.ncmp.api.impl.events.EventsPublisher; +import org.onap.cps.events.EventsPublisher; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java deleted file mode 100644 index 03d440e640..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022-2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events; - -import io.cloudevents.CloudEvent; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeaders; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; -import org.springframework.stereotype.Service; -import org.springframework.util.SerializationUtils; - -/** - * EventsPublisher to publish events. - */ - -@Slf4j -@Service -@RequiredArgsConstructor -public class EventsPublisher<T> { - - /** - * KafkaTemplate for legacy (non-cloud) events. - * Note: Cloud events should be used. This will be addressed as part of https://jira.onap.org/browse/CPS-1717. - */ - private final KafkaTemplate<String, T> legacyKafkaEventTemplate; - - private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate; - - /** - * Generic CloudEvent publisher. - * - * @param topicName valid topic name - * @param eventKey message key - * @param event message payload - */ - public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { - final CompletableFuture<SendResult<String, CloudEvent>> eventFuture = - cloudEventKafkaTemplate.send(topicName, eventKey, event); - eventFuture.whenComplete((result, e) -> { - if (e == null) { - log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(), - result.getProducerRecord().value()); - - } else { - log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); - } - }); - } - - /** - * Generic Event publisher. - * Note: Cloud events should be used. This will be addressed as part of https://jira.onap.org/browse/CPS-1717. - * - * @param topicName valid topic name - * @param eventKey message key - * @param event message payload - */ - public void publishEvent(final String topicName, final String eventKey, final T event) { - final CompletableFuture<SendResult<String, T>> eventFuture = - legacyKafkaEventTemplate.send(topicName, eventKey, event); - handleLegacyEventCallback(topicName, eventFuture); - } - - /** - * Generic Event Publisher with headers. - * - * @param topicName valid topic name - * @param eventKey message key - * @param eventHeaders event headers - * @param event message payload - */ - public void publishEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) { - - final ProducerRecord<String, T> producerRecord = - new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); - final CompletableFuture<SendResult<String, T>> eventFuture = legacyKafkaEventTemplate.send(producerRecord); - handleLegacyEventCallback(topicName, eventFuture); - } - - /** - * Generic Event Publisher with headers. - * - * @param topicName valid topic name - * @param eventKey message key - * @param eventHeaders map of event headers - * @param event message payload - */ - public void publishEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders, - final T event) { - - publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); - } - - private void handleLegacyEventCallback(final String topicName, - final CompletableFuture<SendResult<String, T>> eventFuture) { - eventFuture.whenComplete((result, e) -> { - if (e == null) { - log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(), - result.getProducerRecord().value()); - } else { - log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); - } - }); - } - - private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) { - final Headers eventHeaders = new RecordHeaders(); - eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value))); - return eventHeaders; - } - -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index 88ebd35c88..f635f1a80b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (c) 2023 Nordix Foundation. + * Copyright (c) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.ncmp.api.impl.events.EventsPublisher; +import org.onap.cps.events.EventsPublisher; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java index cba1f76ff7..9bd1119588 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation + * Copyright (C) 2023-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.api.impl.events.EventsPublisher; +import org.onap.cps.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.events.NcmpCloudEventBuilder; import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc; import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java index 2e1b914b1d..f51b58c3ef 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2023 Nordix Foundation + * Copyright (C) 2022-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ import io.micrometer.core.annotation.Timed; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.api.impl.events.EventsPublisher; +import org.onap.cps.events.EventsPublisher; import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; import org.onap.cps.utils.JsonObjectMapper; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java index e78f0901fe..f13c842b25 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation + * Copyright (C) 2023-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,8 +35,8 @@ import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.events.EventsPublisher; import org.onap.cps.ncmp.api.NcmpResponseStatus; -import org.onap.cps.ncmp.api.impl.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.inventory.CmHandleState; import org.onap.cps.ncmp.api.impl.operations.CmHandle; import org.onap.cps.ncmp.api.impl.operations.DmiDataOperation; |