summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormpriyank <priyank.maheshwari@est.tech>2023-07-26 17:33:35 +0100
committermpriyank <priyank.maheshwari@est.tech>2023-08-18 13:40:10 +0100
commitf4778800c815fbc962b194a177525957a564231d (patch)
tree1075f0c9173f0511cf9e7410adcf80494355aceb
parenta52b1825f99318181cd356dcde1b1db46c1098ac (diff)
Device heartbeat listener
- Infrastructure code to have the kafka listener and distributed set in place - performance tested locally - testware added Issue-ID: CPS-1642 Change-Id: I775dbe6e6b520b8777faa08610db439877757572 Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
-rw-r--r--cps-application/src/main/resources/application.yml2
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/TrustLevelCacheConfig.java46
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java71
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceTrustLevel.java37
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/TrustLevel.java25
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumerSpec.groovy102
-rw-r--r--cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java12
-rw-r--r--cps-service/src/test/groovy/org/onap/cps/cache/HazelcastCacheConfigSpec.groovy13
8 files changed, 305 insertions, 3 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index a18de2acd..6aefda9c3 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -109,6 +109,8 @@ app:
dmi:
cm-events:
topic: ${DMI_CM_EVENTS_TOPIC:dmi-cm-events}
+ device-heartbeat:
+ topic: ${DMI_DEVICE_HEARTBEAT_TOPIC:dmi-device-heartbeat}
notification:
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/TrustLevelCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/TrustLevelCacheConfig.java
new file mode 100644
index 000000000..816fc5067
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/TrustLevelCacheConfig.java
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.config.embeddedcache;
+
+import com.hazelcast.collection.ISet;
+import com.hazelcast.config.SetConfig;
+import org.onap.cps.cache.HazelcastCacheConfig;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class TrustLevelCacheConfig extends HazelcastCacheConfig {
+
+ private static final SetConfig untrustworthyCmHandlesSetConfig = createSetConfig("untrustworthyCmHandlesSetConfig");
+
+ /**
+ * Untrustworthy cmhandle set instance.
+ *
+ * @return instance of distributed set of untrustworthy cmhandles.
+ */
+ @Bean
+ public ISet<String> untrustworthyCmHandlesSet() {
+ return createHazelcastInstance("untrustworthyCmHandlesSet", untrustworthyCmHandlesSetConfig).getSet(
+ "untrustworthyCmHandlesSet");
+ }
+
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java
new file mode 100644
index 000000000..458c1b851
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java
@@ -0,0 +1,71 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.trustlevel;
+
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
+
+import com.hazelcast.collection.ISet;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class DeviceHeartbeatConsumer {
+
+ private final ISet<String> untrustworthyCmHandlesSet;
+
+ /**
+ * Listening the device heartbeats.
+ *
+ * @param deviceHeartbeatConsumerRecord Device Heartbeat record.
+ */
+ @KafkaListener(topics = "${app.dmi.device-heartbeat.topic}",
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ public void heartbeatListener(final ConsumerRecord<String, CloudEvent> deviceHeartbeatConsumerRecord) {
+
+ final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(), "ce_id");
+
+ final DeviceTrustLevel deviceTrustLevel =
+ toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class);
+
+ if (deviceTrustLevel == null || deviceTrustLevel.getTrustLevel() == null) {
+ log.warn("No or Invalid trust level defined");
+ return;
+ }
+
+ if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.NONE)) {
+ untrustworthyCmHandlesSet.add(cmHandleId);
+ log.debug("Added cmHandleId to untrustworthy set : {}", cmHandleId);
+ } else if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.COMPLETE) && untrustworthyCmHandlesSet.contains(
+ cmHandleId)) {
+ untrustworthyCmHandlesSet.remove(cmHandleId);
+ log.debug("Removed cmHandleId from untrustworthy set : {}", cmHandleId);
+ }
+ }
+
+}
+
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceTrustLevel.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceTrustLevel.java
new file mode 100644
index 000000000..2ed4e4522
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceTrustLevel.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.trustlevel;
+
+import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@AllArgsConstructor
+@Data
+@NoArgsConstructor
+class DeviceTrustLevel implements Serializable {
+
+ private static final long serialVersionUID = -1705715024067165212L;
+
+ private TrustLevel trustLevel;
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/TrustLevel.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/TrustLevel.java
new file mode 100644
index 000000000..f4254bb47
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/TrustLevel.java
@@ -0,0 +1,25 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.trustlevel;
+
+public enum TrustLevel {
+ NONE, COMPLETE;
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumerSpec.groovy
new file mode 100644
index 000000000..48de23dca
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumerSpec.groovy
@@ -0,0 +1,102 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.trustlevel
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.hazelcast.collection.ISet
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.utils.JsonObjectMapper
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
+class DeviceHeartbeatConsumerSpec extends Specification {
+
+ def mockUntrustworthyCmHandlesSet = Mock(ISet<String>)
+ def objectMapper = new ObjectMapper()
+
+ def objectUnderTest = new DeviceHeartbeatConsumer(mockUntrustworthyCmHandlesSet)
+
+ def 'Operations to be done in an empty untrustworthy set for #scenario'() {
+ given: 'an event with trustlevel as #trustLevel'
+ def incomingEvent = testCloudEvent(trustLevel)
+ and: 'transformed as a kafka record'
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>('test-device-heartbeat', 0, 0, 'cmhandle1', incomingEvent)
+ consumerRecord.headers().add('ce_id', objectMapper.writeValueAsBytes('cmhandle1'))
+ when: 'the event is consumed'
+ objectUnderTest.heartbeatListener(consumerRecord)
+ then: 'untrustworthy cmhandles are stored'
+ untrustworthyCmHandlesSetInvocationForAdd * mockUntrustworthyCmHandlesSet.add(_)
+ and: 'trustworthy cmHandles will be removed from untrustworthy set'
+ untrustworthyCmHandlesSetInvocationForContains * mockUntrustworthyCmHandlesSet.contains(_)
+
+ where: 'below scenarios are applicable'
+ scenario | trustLevel || untrustworthyCmHandlesSetInvocationForAdd | untrustworthyCmHandlesSetInvocationForContains
+ 'None trust' | TrustLevel.NONE || 1 | 0
+ 'Complete trust' | TrustLevel.COMPLETE || 0 | 1
+ }
+
+ def 'Invalid trust'() {
+ when: 'we provide an invalid trust in the event'
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>('test-device-heartbeat', 0, 0, 'cmhandle1', testCloudEvent(null))
+ consumerRecord.headers().add('ce_id', objectMapper.writeValueAsBytes('cmhandle1'))
+ objectUnderTest.heartbeatListener(consumerRecord)
+ then: 'no interaction with the untrustworthy cmhandles set'
+ 0 * mockUntrustworthyCmHandlesSet.add(_)
+ 0 * mockUntrustworthyCmHandlesSet.contains(_)
+ 0 * mockUntrustworthyCmHandlesSet.remove(_)
+ and: 'control flow returns without any exception'
+ noExceptionThrown()
+
+ }
+
+ def 'Remove trustworthy cmhandles from untrustworthy cmhandles set'() {
+ given: 'an event with COMPLETE trustlevel'
+ def incomingEvent = testCloudEvent(TrustLevel.COMPLETE)
+ and: 'transformed as a kafka record'
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>('test-device-heartbeat', 0, 0, 'cmhandle1', incomingEvent)
+ consumerRecord.headers().add('ce_id', objectMapper.writeValueAsBytes('cmhandle1'))
+ and: 'untrustworthy cmhandles set contains cmhandle1'
+ 1 * mockUntrustworthyCmHandlesSet.contains(_) >> true
+ when: 'the event is consumed'
+ objectUnderTest.heartbeatListener(consumerRecord)
+ then: 'cmhandle removed from untrustworthy cmhandles set'
+ 1 * mockUntrustworthyCmHandlesSet.remove(_) >> {
+ args ->
+ {
+ args[0].equals('cmhandle1')
+ }
+ }
+
+ }
+
+ def testCloudEvent(trustLevel) {
+ return CloudEventBuilder.v1().withData(objectMapper.writeValueAsBytes(new DeviceTrustLevel(trustLevel)))
+ .withId("cmhandle1")
+ .withSource(URI.create('DMI'))
+ .withDataSchema(URI.create('test'))
+ .withType('org.onap.cm.events.trustlevel-notification')
+ .build()
+ }
+
+}
diff --git a/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java b/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java
index 405e6e2a8..067191b5a 100644
--- a/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java
+++ b/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java
@@ -24,6 +24,7 @@ import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.NamedConfig;
import com.hazelcast.config.QueueConfig;
+import com.hazelcast.config.SetConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import lombok.extern.slf4j.Slf4j;
@@ -57,6 +58,10 @@ public class HazelcastCacheConfig {
if (namedConfig instanceof QueueConfig) {
config.addQueueConfig((QueueConfig) namedConfig);
}
+ if (namedConfig instanceof SetConfig) {
+ config.addSetConfig((SetConfig) namedConfig);
+ }
+
config.setClusterName(clusterName);
updateDiscoveryMode(config);
return config;
@@ -76,6 +81,13 @@ public class HazelcastCacheConfig {
return commonQueueConfig;
}
+ protected static SetConfig createSetConfig(final String configName) {
+ final SetConfig commonSetConfig = new SetConfig(configName);
+ commonSetConfig.setBackupCount(1);
+ commonSetConfig.setAsyncBackupCount(1);
+ return commonSetConfig;
+ }
+
protected void updateDiscoveryMode(final Config config) {
if (cacheKubernetesEnabled) {
log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName);
diff --git a/cps-service/src/test/groovy/org/onap/cps/cache/HazelcastCacheConfigSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/cache/HazelcastCacheConfigSpec.groovy
index 8efd48547..415e9fd49 100644
--- a/cps-service/src/test/groovy/org/onap/cps/cache/HazelcastCacheConfigSpec.groovy
+++ b/cps-service/src/test/groovy/org/onap/cps/cache/HazelcastCacheConfigSpec.groovy
@@ -45,10 +45,17 @@ class HazelcastCacheConfigSpec extends Specification {
} else {
assert result.config.queueConfigs.isEmpty()
}
+ and: 'if applicable it has a set config with the expected name'
+ if (expectSetConfig) {
+ assert result.config.setConfigs.values()[0].name == 'my set config'
+ } else {
+ assert result.config.setConfigs.isEmpty()
+ }
where: 'the following configs are used'
- scenario | config || expectMapConfig | expectQueueConfig
- 'Map Config' | HazelcastCacheConfig.createMapConfig('my map config') || true | false
- 'Queue Config' | HazelcastCacheConfig.createQueueConfig('my queue config') || false | true
+ scenario | config || expectMapConfig | expectQueueConfig | expectSetConfig
+ 'Map Config' | HazelcastCacheConfig.createMapConfig('my map config') || true | false | false
+ 'Queue Config' | HazelcastCacheConfig.createQueueConfig('my queue config') || false | true | false
+ 'Set Config' | HazelcastCacheConfig.createSetConfig('my set config') || false | false | true
}
}