aboutsummaryrefslogtreecommitdiffstats
path: root/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java
blob: 623a07c752e8b6c830f9ad2841ccae5938285c4e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
 * ============LICENSE_START==========================================
 * org.onap.music
 * ===================================================================
 *  Copyright (c) 2019 AT&T Intellectual Property
 * ===================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 * 
 * ============LICENSE_END=============================================
 * ====================================================================
 */

package org.onap.music.lockingservice.cassandra;

import java.util.HashSet;
import java.util.Set;
import org.onap.music.datastore.MusicDataStoreHandle;
import org.onap.music.datastore.PreparedQueryObject;
import org.onap.music.eelf.logging.EELFLoggerDelegate;
import org.onap.music.exceptions.MusicServiceException;
import org.onap.music.main.MusicCore;
import org.onap.music.main.MusicUtil;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;

public class LockCleanUpDaemon extends Thread {
    
    boolean terminated = false;
    private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(LockCleanUpDaemon.class);

    
    public LockCleanUpDaemon() {
    }
    
    @Override
    public void run() {
        if (MusicUtil.getLockDaemonSleepTimeMs()<0) {
            terminate();
        }
        while (!terminated) {
            try {
                cleanupStaleLocks();
            } catch (MusicServiceException e) {
                logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to clean up locks", e);
            }
            try {
                Thread.sleep(MusicUtil.getLockDaemonSleepTimeMs());
            } catch (InterruptedException e) {
                logger.warn(EELFLoggerDelegate.applicationLogger, "Interrupted exception", e);

            }
        }
    }

    private void cleanupStaleLocks() throws MusicServiceException {
        Set<String> lockQTables = getLockQTables();
        logger.info(EELFLoggerDelegate.applicationLogger, "Lock q tables found: " + lockQTables);
        for(String lockTable: lockQTables) {
            try {
                cleanUpLocksFromTable(lockTable);
            } catch (MusicServiceException e) {
                logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to clear locks on table " + lockTable, e);
            }
        }
    }


    private Set<String> getLockQTables() throws MusicServiceException {
        Set<String> keyspacesToCleanUp = MusicUtil.getKeyspacesToCleanLocks();
        Set<String> lockQTables = new HashSet<>();
        
        PreparedQueryObject query = new PreparedQueryObject();
        query.appendQueryString("SELECT keyspace_name, table_name FROM system_schema.tables;");
        ResultSet results = MusicCore.get(query);
        
        for (Row row: results) {
            if (keyspacesToCleanUp.contains(row.getString("keyspace_name"))
                    && row.getString("table_name").toLowerCase().startsWith(CassaLockStore.table_prepend_name.toLowerCase()) ) {
                lockQTables.add(row.getString("keyspace_name") + "." + row.getString("table_name"));
            }
        }
        return lockQTables;
    }

    private void cleanUpLocksFromTable(String lockTable) throws MusicServiceException {
        PreparedQueryObject query = new PreparedQueryObject();
        query.appendQueryString("SELECT * from " + lockTable);
        ResultSet results = MusicCore.get(query);
        for (Row lock: results) {
            if (!lock.isNull("lockreference")) {
                try {
                    deleteLockIfStale(lockTable, lock);
                } catch (MusicServiceException e) {
                    logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to delete a potentially stale lock " + lock, e);
                }
            }
        }
    }
    
    
    private void deleteLockIfStale(String lockTable, Row lock) throws MusicServiceException {
        final String CREATETIME="createtime";
        final String ACQUIRETIME="acquiretime";
        if (lock.isNull(CREATETIME) && lock.isNull("acquiretime")) {
            return;
        }

        long createTime = lock.isNull(CREATETIME) ? 0 : Long.parseLong(lock.getString(CREATETIME));
        long acquireTime = lock.isNull(ACQUIRETIME) ? 0 : Long.parseLong(lock.getString(ACQUIRETIME));
        long row_access_time = Math.max(createTime, acquireTime);
        if (System.currentTimeMillis() > row_access_time + MusicUtil.getDefaultLockLeasePeriod()) {
            logger.info(EELFLoggerDelegate.applicationLogger, "Stale lock detected and being removed: " + lock);
            PreparedQueryObject query = new PreparedQueryObject();
            query.appendQueryString("DELETE FROM " + lockTable + " WHERE key='" + lock.getString("key") + "' AND " +
                    "lockreference=" + lock.getLong("lockreference") + " IF EXISTS;");
            MusicDataStoreHandle.getDSHandle().getSession().execute(query.getQuery());
        }
    }

    public void terminate() {
        terminated = true;
    }
}