From b3b44d9aa4e71c4264df140ffff27b5ba4c44e82 Mon Sep 17 00:00:00 2001 From: "halil.cakal" Date: Tue, 6 Jun 2023 16:20:55 +0100 Subject: Agreed outstanding comments including package refactoring - Change package of AVC Subscription relevent codes into their place - Change package of Subscription Event Cache config inot its place - Add more branches for subscription outcome mapper - Add more branches for subscription event response consumer - Change unit test method params in order not to use deprecated methods Issue-ID: CPS-1730 Change-Id: Ieda587d5be318db8360d52d49dc38d7ce3dd85cd Signed-off-by: halil.cakal --- .../ForwardedSubscriptionEventCacheConfig.java | 52 ++++++++++++ .../avc/ForwardedSubscriptionEventCacheConfig.java | 52 ------------ .../api/impl/event/avc/ResponseTimeoutTask.java | 59 ------------- .../avc/SubscriptionEventResponseConsumer.java | 96 ---------------------- .../impl/event/avc/SubscriptionOutcomeMapper.java | 88 -------------------- .../avcsubscription/ResponseTimeoutTask.java | 58 +++++++++++++ .../SubscriptionEventForwarder.java | 3 +- .../SubscriptionEventResponseConsumer.java | 95 +++++++++++++++++++++ .../SubscriptionEventResponseOutcome.java | 1 - .../avcsubscription/SubscriptionOutcomeMapper.java | 88 ++++++++++++++++++++ 10 files changed, 294 insertions(+), 298 deletions(-) create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/ForwardedSubscriptionEventCacheConfig.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java (limited to 'cps-ncmp-service/src/main/java/org/onap') diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/ForwardedSubscriptionEventCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/ForwardedSubscriptionEventCacheConfig.java new file mode 100644 index 0000000000..306d103a8f --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/ForwardedSubscriptionEventCacheConfig.java @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.config.embeddedcache; + +import com.hazelcast.config.MapConfig; +import com.hazelcast.map.IMap; +import java.util.Set; +import org.onap.cps.cache.HazelcastCacheConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Core infrastructure of the hazelcast distributed cache for subscription forward config use cases. + */ +@Configuration +public class ForwardedSubscriptionEventCacheConfig extends HazelcastCacheConfig { + + public static final int SUBSCRIPTION_FORWARD_STARTED_TTL_SECS = 600; + + private static final MapConfig forwardedSubscriptionEventCacheMapConfig = + createMapConfig("forwardedSubscriptionEventCacheMapConfig"); + + /** + * Distributed instance of forwarded subscription information cache that contains subscription event + * id by dmi names as properties. + * + * @return configured map of subscription event ids as keys to sets of dmi names for values + */ + @Bean + public IMap> forwardedSubscriptionEventCache() { + return createHazelcastInstance("hazelCastInstanceSubscriptionEvents", + forwardedSubscriptionEventCacheMapConfig).getMap("forwardedSubscriptionEventCache"); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java deleted file mode 100644 index d2f16a71d4..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START======================================================== - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.event.avc; - -import com.hazelcast.config.MapConfig; -import com.hazelcast.map.IMap; -import java.util.Set; -import org.onap.cps.cache.HazelcastCacheConfig; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * Core infrastructure of the hazelcast distributed cache for subscription forward config use cases. - */ -@Configuration -public class ForwardedSubscriptionEventCacheConfig extends HazelcastCacheConfig { - - public static final int SUBSCRIPTION_FORWARD_STARTED_TTL_SECS = 600; - - private static final MapConfig forwardedSubscriptionEventCacheMapConfig = - createMapConfig("forwardedSubscriptionEventCacheMapConfig"); - - /** - * Distributed instance of forwarded subscription information cache that contains subscription event - * id by dmi names as properties. - * - * @return configured map of subscription event ids as keys to sets of dmi names for values - */ - @Bean - public IMap> forwardedSubscriptionEventCache() { - return createHazelcastInstance("hazelCastInstanceSubscriptionEvents", - forwardedSubscriptionEventCacheMapConfig).getMap("forwardedSubscriptionEventCache"); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java deleted file mode 100644 index 9c7b79f733..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.event.avc; - -import com.hazelcast.map.IMap; -import java.util.Set; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome; - -@Slf4j -@RequiredArgsConstructor -public class ResponseTimeoutTask implements Runnable { - - private final IMap> forwardedSubscriptionEventCache; - private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; - private final String subscriptionClientId; - private final String subscriptionName; - - @Override - public void run() { - - try { - generateAndSendResponse(); - } catch (final Exception exception) { - log.info("Caught exception in Runnable for ResponseTimeoutTask. StackTrace: {}", - exception.toString()); - } - - } - - private void generateAndSendResponse() { - final String subscriptionEventId = subscriptionClientId + subscriptionName; - if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { - final Set dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); - subscriptionEventResponseOutcome.sendResponse(subscriptionClientId, subscriptionName, - dmiNames.isEmpty()); - forwardedSubscriptionEventCache.remove(subscriptionEventId); - } - } -} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java deleted file mode 100644 index eb3daeb4da..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.event.avc; - -import com.hazelcast.map.IMap; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseMapper; -import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; -import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; -import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -@RequiredArgsConstructor -public class SubscriptionEventResponseConsumer { - - private final IMap> forwardedSubscriptionEventCache; - private final SubscriptionPersistence subscriptionPersistence; - private final SubscriptionEventResponseMapper subscriptionEventResponseMapper; - private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; - - @Value("${notification.enabled:true}") - private boolean notificationFeatureEnabled; - - @Value("${ncmp.model-loader.subscription:false}") - private boolean subscriptionModelLoaderEnabled; - - /** - * Consume subscription response event. - * - * @param subscriptionEventResponseConsumerRecord the event to be consumed - */ - @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"}) - public void consumeSubscriptionEventResponse( - final ConsumerRecord subscriptionEventResponseConsumerRecord) { - final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value(); - final String clientId = subscriptionEventResponse.getClientId(); - log.info("subscription event response of clientId: {} is received.", clientId); - final String subscriptionName = subscriptionEventResponse.getSubscriptionName(); - final String subscriptionEventId = clientId + subscriptionName; - boolean isFullOutcomeResponse = false; - if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { - final Set dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); - - dmiNames.remove(subscriptionEventResponse.getDmiName()); - forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames, - ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); - isFullOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty(); - - if (isFullOutcomeResponse) { - forwardedSubscriptionEventCache.remove(subscriptionEventId); - } - } - if (subscriptionModelLoaderEnabled) { - updateSubscriptionEvent(subscriptionEventResponse); - } - if (isFullOutcomeResponse && notificationFeatureEnabled) { - subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName, - isFullOutcomeResponse); - } - } - - private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) { - final YangModelSubscriptionEvent yangModelSubscriptionEvent = - subscriptionEventResponseMapper - .toYangModelSubscriptionEvent(subscriptionEventResponse); - subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent); - } -} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java deleted file mode 100644 index 136c1d6ff4..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.event.avc; - -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.mapstruct.Mapper; -import org.mapstruct.Mapping; -import org.mapstruct.Named; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; -import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; -import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome; - -@Mapper(componentModel = "spring") -public interface SubscriptionOutcomeMapper { - - @Mapping(source = "clientId", target = "event.subscription.clientID") - @Mapping(source = "subscriptionName", target = "event.subscription.name") - @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.rejectedTargets", - qualifiedByName = "mapStatusToCmHandleRejected") - @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.acceptedTargets", - qualifiedByName = "mapStatusToCmHandleAccepted") - @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.pendingTargets", - qualifiedByName = "mapStatusToCmHandlePending") - SubscriptionEventOutcome toSubscriptionEventOutcome( - SubscriptionEventResponse subscriptionEventResponse); - - /** - * Maps StatusToCMHandle to list of TargetCmHandle rejected. - * - * @param targets as a map - * @return TargetCmHandle list - */ - @Named("mapStatusToCmHandleRejected") - default List mapStatusToCmHandleRejected(Map targets) { - return targets.entrySet() - .stream().filter(target -> SubscriptionStatus.REJECTED.equals(target.getValue())) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); - } - - /** - * Maps StatusToCMHandle to list of TargetCmHandle accepted. - * - * @param targets as a map - * @return TargetCmHandle list - */ - @Named("mapStatusToCmHandleAccepted") - default List mapStatusToCmHandleAccepted(Map targets) { - return targets.entrySet() - .stream().filter(target -> SubscriptionStatus.ACCEPTED.equals(target.getValue())) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); - } - - /** - * Maps StatusToCMHandle to list of TargetCmHandle pending. - * - * @param targets as a map - * @return TargetCmHandle list - */ - @Named("mapStatusToCmHandlePending") - default List mapStatusToCmHandlePending(Map targets) { - return targets.entrySet() - .stream().filter(target -> SubscriptionStatus.PENDING.equals(target.getValue())) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java new file mode 100644 index 0000000000..a81f8fd731 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.avcsubscription; + +import com.hazelcast.map.IMap; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class ResponseTimeoutTask implements Runnable { + + private final IMap> forwardedSubscriptionEventCache; + private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; + private final String subscriptionClientId; + private final String subscriptionName; + + @Override + public void run() { + + try { + generateAndSendResponse(); + } catch (final Exception exception) { + log.info("Caught exception in Runnable for ResponseTimeoutTask. StackTrace: {}", + exception.toString()); + } + + } + + private void generateAndSendResponse() { + final String subscriptionEventId = subscriptionClientId + subscriptionName; + if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { + final Set dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); + subscriptionEventResponseOutcome.sendResponse(subscriptionClientId, subscriptionName, + dmiNames.isEmpty()); + forwardedSubscriptionEventCache.remove(subscriptionEventId); + } + } +} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java index 19a0f12b0b..9e363f3cdd 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java @@ -35,8 +35,7 @@ import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.header.Headers; -import org.onap.cps.ncmp.api.impl.event.avc.ForwardedSubscriptionEventCacheConfig; -import org.onap.cps.ncmp.api.impl.event.avc.ResponseTimeoutTask; +import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java new file mode 100644 index 0000000000..a1860a6136 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java @@ -0,0 +1,95 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.avcsubscription; + +import com.hazelcast.map.IMap; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; +import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RequiredArgsConstructor +public class SubscriptionEventResponseConsumer { + + private final IMap> forwardedSubscriptionEventCache; + private final SubscriptionPersistence subscriptionPersistence; + private final SubscriptionEventResponseMapper subscriptionEventResponseMapper; + private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; + + @Value("${notification.enabled:true}") + private boolean notificationFeatureEnabled; + + @Value("${ncmp.model-loader.subscription:false}") + private boolean subscriptionModelLoaderEnabled; + + /** + * Consume subscription response event. + * + * @param subscriptionEventResponseConsumerRecord the event to be consumed + */ + @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"}) + public void consumeSubscriptionEventResponse( + final ConsumerRecord subscriptionEventResponseConsumerRecord) { + final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value(); + final String clientId = subscriptionEventResponse.getClientId(); + log.info("subscription event response of clientId: {} is received.", clientId); + final String subscriptionName = subscriptionEventResponse.getSubscriptionName(); + final String subscriptionEventId = clientId + subscriptionName; + boolean isFullOutcomeResponse = false; + if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { + final Set dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); + + dmiNames.remove(subscriptionEventResponse.getDmiName()); + forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames, + ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); + isFullOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty(); + + if (isFullOutcomeResponse) { + forwardedSubscriptionEventCache.remove(subscriptionEventId); + } + } + if (subscriptionModelLoaderEnabled) { + updateSubscriptionEvent(subscriptionEventResponse); + } + if (isFullOutcomeResponse && notificationFeatureEnabled) { + subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName, + isFullOutcomeResponse); + } + } + + private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) { + final YangModelSubscriptionEvent yangModelSubscriptionEvent = + subscriptionEventResponseMapper + .toYangModelSubscriptionEvent(subscriptionEventResponse); + subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent); + } +} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java index ade3f22f4b..a74682571b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java @@ -30,7 +30,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; -import org.onap.cps.ncmp.api.impl.event.avc.SubscriptionOutcomeMapper; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java new file mode 100644 index 0000000000..cecde5f816 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java @@ -0,0 +1,88 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.avcsubscription; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Named; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; +import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; +import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome; + +@Mapper(componentModel = "spring") +public interface SubscriptionOutcomeMapper { + + @Mapping(source = "clientId", target = "event.subscription.clientID") + @Mapping(source = "subscriptionName", target = "event.subscription.name") + @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.rejectedTargets", + qualifiedByName = "mapStatusToCmHandleRejected") + @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.acceptedTargets", + qualifiedByName = "mapStatusToCmHandleAccepted") + @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.pendingTargets", + qualifiedByName = "mapStatusToCmHandlePending") + SubscriptionEventOutcome toSubscriptionEventOutcome( + SubscriptionEventResponse subscriptionEventResponse); + + /** + * Maps StatusToCMHandle to list of TargetCmHandle rejected. + * + * @param targets as a map + * @return TargetCmHandle list + */ + @Named("mapStatusToCmHandleRejected") + default List mapStatusToCmHandleRejected(Map targets) { + return targets.entrySet() + .stream().filter(target -> SubscriptionStatus.REJECTED.equals(target.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + /** + * Maps StatusToCMHandle to list of TargetCmHandle accepted. + * + * @param targets as a map + * @return TargetCmHandle list + */ + @Named("mapStatusToCmHandleAccepted") + default List mapStatusToCmHandleAccepted(Map targets) { + return targets.entrySet() + .stream().filter(target -> SubscriptionStatus.ACCEPTED.equals(target.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + /** + * Maps StatusToCMHandle to list of TargetCmHandle pending. + * + * @param targets as a map + * @return TargetCmHandle list + */ + @Named("mapStatusToCmHandlePending") + default List mapStatusToCmHandlePending(Map targets) { + return targets.entrySet() + .stream().filter(target -> SubscriptionStatus.PENDING.equals(target.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } +} -- cgit 1.2.3-korg