summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
blob: 0b672663754b5bff157a6f6ccb77796836613aea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
 * ============LICENSE_START========================================================
 *  Copyright (C) 2022-2023 Nordix Foundation
 *  ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 *  SPDX-License-Identifier: Apache-2.0
 *  ============LICENSE_END=========================================================
 */

package org.onap.cps.ncmp.api.impl.config.embeddedcache;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.NamedConfig;
import com.hazelcast.config.QueueConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import java.util.concurrent.BlockingQueue;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.spi.model.DataNode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Core infrastructure of the hazelcast distributed caches for Module Sync and Data Sync use cases.
 */
@Slf4j
@Configuration
public class SynchronizationCacheConfig {

    public static final int MODULE_SYNC_STARTED_TTL_SECS = 600;
    public static final int DATA_SYNC_SEMAPHORE_TTL_SECS = 1800;

    @Value("${hazelcast.mode.kubernetes.enabled}")
    private boolean cacheKubernetesEnabled;

    @Value("${hazelcast.mode.kubernetes.service-name}")
    private String cacheKubernetesServiceName;

    private static final QueueConfig commonQueueConfig = createQueueConfig();
    private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig");
    private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig");

    /**
     * Module Sync Distributed Queue Instance.
     *
     * @return queue of cm handles (data nodes) that need module sync
     */
    @Bean
    public BlockingQueue<DataNode> moduleSyncWorkQueue() {
        return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig)
            .getQueue("moduleSyncWorkQueue");
    }

    /**
     * Module Sync started (and maybe finished) on cm handles (ids).
     *
     * @return Map of cm handles (ids) and objects (not used really) for which module sync has started or been completed
     */
    @Bean
    public IMap<String, Object> moduleSyncStartedOnCmHandles() {
        return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig)
            .getMap("moduleSyncStartedOnCmHandles");
    }

    /**
     * Data Sync Distributed Map Instance.
     *
     * @return configured map of data sync semaphores
     */
    @Bean
    public IMap<String, Boolean> dataSyncSemaphores() {
        return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig)
            .getMap("dataSyncSemaphores");
    }

    private HazelcastInstance createHazelcastInstance(
        final String hazelcastInstanceName, final NamedConfig namedConfig) {
        return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig));
    }

    private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) {
        final Config config = new Config(instanceName);
        if (namedConfig instanceof MapConfig) {
            config.addMapConfig((MapConfig) namedConfig);
        }
        if (namedConfig instanceof QueueConfig) {
            config.addQueueConfig((QueueConfig) namedConfig);
        }
        config.setClusterName("synchronization-caches");
        updateDiscoveryMode(config);
        return config;
    }

    private static QueueConfig createQueueConfig() {
        final QueueConfig commonQueueConfig = new QueueConfig("defaultQueueConfig");
        commonQueueConfig.setBackupCount(3);
        commonQueueConfig.setAsyncBackupCount(3);
        return commonQueueConfig;
    }

    private static MapConfig createMapConfig(final String configName) {
        final MapConfig mapConfig = new MapConfig(configName);
        mapConfig.setBackupCount(3);
        mapConfig.setAsyncBackupCount(3);
        return mapConfig;
    }

    private void updateDiscoveryMode(final Config config) {
        if (cacheKubernetesEnabled) {
            log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName);
            config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true)
                    .setProperty("service-name", cacheKubernetesServiceName);
        }
    }

}