summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java35
1 files changed, 22 insertions, 13 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
index 20df706c0..ddb9fd6fc 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
@@ -21,6 +21,7 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription;
import com.hazelcast.map.IMap;
+import io.cloudevents.CloudEvent;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
@@ -32,8 +33,9 @@ import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEven
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
+import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventResponseCloudMapper;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
import org.onap.cps.spi.model.DataNode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
@@ -61,19 +63,21 @@ public class SubscriptionEventResponseConsumer {
* @param subscriptionEventResponseConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"})
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void consumeSubscriptionEventResponse(
- final ConsumerRecord<String, SubscriptionEventResponse> subscriptionEventResponseConsumerRecord) {
- final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value();
- final String clientId = subscriptionEventResponse.getClientId();
+ final ConsumerRecord<String, CloudEvent> subscriptionEventResponseConsumerRecord) {
+ final CloudEvent cloudEvent = subscriptionEventResponseConsumerRecord.value();
+ final String eventType = subscriptionEventResponseConsumerRecord.value().getType();
+ final SubscriptionEventResponse subscriptionEventResponse =
+ SubscriptionEventResponseCloudMapper.toSubscriptionEventResponse(cloudEvent);
+ final String clientId = subscriptionEventResponse.getData().getClientId();
log.info("subscription event response of clientId: {} is received.", clientId);
- final String subscriptionName = subscriptionEventResponse.getSubscriptionName();
+ final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName();
final String subscriptionEventId = clientId + subscriptionName;
boolean createOutcomeResponse = false;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
-
- dmiNames.remove(subscriptionEventResponse.getDmiName());
+ dmiNames.remove(subscriptionEventResponse.getData().getDmiName());
forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames,
ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
@@ -84,7 +88,7 @@ public class SubscriptionEventResponseConsumer {
if (createOutcomeResponse
&& notificationFeatureEnabled
&& hasNoPendingCmHandles(clientId, subscriptionName)) {
- subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName);
+ subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse, eventType);
forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
}
@@ -92,10 +96,15 @@ public class SubscriptionEventResponseConsumer {
private boolean hasNoPendingCmHandles(final String clientId, final String subscriptionName) {
final Collection<DataNode> dataNodeSubscription = subscriptionPersistence.getCmHandlesForSubscriptionEvent(
clientId, subscriptionName);
- final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
- DataNodeHelper.getCmHandleIdToStatusMapFromDataNodes(
- dataNodeSubscription);
- return !cmHandleIdToStatusMap.values().contains(SubscriptionStatus.PENDING);
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMapOriginal =
+ DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(dataNodeSubscription);
+ for (final Map<String, String> statusAndDetailsMap : cmHandleIdToStatusAndDetailsAsMapOriginal.values()) {
+ final String status = statusAndDetailsMap.get("status");
+ if (SubscriptionStatus.PENDING.toString().equals(status)) {
+ return false;
+ }
+ }
+ return true;
}
private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) {