diff options
author | arthurdent3 <tn1381@att.com> | 2018-02-02 21:19:53 -0500 |
---|---|---|
committer | arthurdent3 <tn1381@att.com> | 2018-02-05 10:20:49 -0500 |
commit | e99b7fa829bf957c2a46223a1a20a32aebeda91b (patch) | |
tree | 59ea8681b4165df7fb2482af3f6d411115e32f5a /src/main/java/org/onap/music/lockingservice | |
parent | d221feba08b6ad24e7d232247306f7b67934941d (diff) |
Initial code Import.
Issue-ID: MUSIC-21
Change-Id: I89ceab0891b4b7cb999dab532d6bae9092f027cc
Signed-off-by: arthurdent3 <tn1381@att.com>
Diffstat (limited to 'src/main/java/org/onap/music/lockingservice')
7 files changed, 1005 insertions, 0 deletions
diff --git a/src/main/java/org/onap/music/lockingservice/LockListener.java b/src/main/java/org/onap/music/lockingservice/LockListener.java new file mode 100644 index 00000000..33188e60 --- /dev/null +++ b/src/main/java/org/onap/music/lockingservice/LockListener.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + +/** + * This class has two methods which are call back methods when a lock is acquired and when the lock + * is released. + * + */ +public interface LockListener { + /** + * call back called when the lock is acquired + */ + public void lockAcquired(); + + /** + * call back called when the lock is released. + */ + public void lockReleased(); +} diff --git a/src/main/java/org/onap/music/lockingservice/MusicLockState.java b/src/main/java/org/onap/music/lockingservice/MusicLockState.java new file mode 100644 index 00000000..23661ad4 --- /dev/null +++ b/src/main/java/org/onap/music/lockingservice/MusicLockState.java @@ -0,0 +1,126 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import org.onap.music.main.MusicCore; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +// the state variable that will be stored in zookeeper, capturing the transitions of +public class MusicLockState implements Serializable { + public enum LockStatus { + UNLOCKED, BEING_LOCKED, LOCKED + };// captures the state of the lock + + private static EELFLogger logger = EELFManager.getInstance().getLogger(MusicLockState.class); + LockStatus lockStatus; + boolean needToSyncQuorum = false; + String lockHolder; + long leasePeriod = Long.MAX_VALUE, leaseStartTime = -1; + + public MusicLockState(LockStatus lockStatus, String lockHolder) { + this.lockStatus = lockStatus; + this.lockHolder = lockHolder; + } + + public MusicLockState(LockStatus lockStatus, String lockHolder, boolean needToSyncQuorum) { + this.lockStatus = lockStatus; + this.lockHolder = lockHolder; + this.needToSyncQuorum = needToSyncQuorum; + } + + + public long getLeasePeriod() { + return leasePeriod; + } + + public boolean isNeedToSyncQuorum() { + return needToSyncQuorum; + } + + + + public void setLeasePeriod(long leasePeriod) { + this.leasePeriod = leasePeriod; + } + + + public long getLeaseStartTime() { + return leaseStartTime; + } + + + public void setLeaseStartTime(long leaseStartTime) { + this.leaseStartTime = leaseStartTime; + } + + + + public LockStatus getLockStatus() { + return lockStatus; + } + + public void setLockStatus(LockStatus lockStatus) { + this.lockStatus = lockStatus; + } + + public String getLockHolder() { + return lockHolder; + } + + public void setLockHolder(String lockHolder) { + this.lockHolder = lockHolder; + } + + public byte[] serialize() { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = null; + try { + out = new ObjectOutputStream(bos); + out.writeObject(this); + } catch (IOException e) { + logger.error(e.getMessage()); + } + return bos.toByteArray(); + } + + public static MusicLockState deSerialize(byte[] data) { + ByteArrayInputStream bis = new ByteArrayInputStream(data); + Object o = null; + ObjectInput in = null; + try { + in = new ObjectInputStream(bis); + o = in.readObject(); + } catch (ClassNotFoundException | IOException e) { + logger.error(e.getMessage()); + } + return (MusicLockState) o; + } +} diff --git a/src/main/java/org/onap/music/lockingservice/MusicLockingService.java b/src/main/java/org/onap/music/lockingservice/MusicLockingService.java new file mode 100644 index 00000000..59b502ca --- /dev/null +++ b/src/main/java/org/onap/music/lockingservice/MusicLockingService.java @@ -0,0 +1,142 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + + +import java.io.IOException; +import java.util.StringTokenizer; +import java.util.concurrent.CountDownLatch; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.main.MusicCore; +import org.onap.music.main.MusicUtil; +import org.apache.zookeeper.ZooKeeper; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.onap.music.datastore.MusicDataStore; +import org.onap.music.eelf.logging.EELFLoggerDelegate; + +public class MusicLockingService implements Watcher { + + private static final int SESSION_TIMEOUT = 180000; + ZkStatelessLockService zkLockHandle = null; + private CountDownLatch connectedSignal = new CountDownLatch(1); + private static EELFLogger logger = + EELFManager.getInstance().getLogger(MusicLockingService.class); + // private static EELFLoggerDelegate logger = + // EELFLoggerDelegate.getLogger(MusicLockingService.class); + + public MusicLockingService() throws MusicServiceException { + try { + ZooKeeper zk = new ZooKeeper(MusicUtil.getMyZkHost(), SESSION_TIMEOUT, this); + connectedSignal.await(); + zkLockHandle = new ZkStatelessLockService(zk); + } catch (IOException e) { + logger.error(e.getMessage()); + throw new MusicServiceException("IO Error has occured" + e.getMessage()); + } catch (InterruptedException e) { + logger.error(e.getMessage()); + throw new MusicServiceException("Exception Occured " + e.getMessage()); + } + } + + public ZkStatelessLockService getzkLockHandle() { + return zkLockHandle; + } + + public MusicLockingService(String lockServer) { + try { + ZooKeeper zk = new ZooKeeper(lockServer, SESSION_TIMEOUT, this); + connectedSignal.await(); + zkLockHandle = new ZkStatelessLockService(zk); + } catch (IOException | InterruptedException e) { + logger.error(e.getMessage()); + } + } + + public void createLockaIfItDoesNotExist(String lockName) { + if (zkLockHandle.checkIfLockExists(lockName) == false) { + String lockHolder = null; + MusicLockState ml = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder); + byte[] data = ml.serialize(); + zkLockHandle.createLock(lockName, data); + } + } + + public void setLockState(String lockName, MusicLockState mls) { + byte[] data = mls.serialize(); + zkLockHandle.setNodeData(lockName, data); + } + + public MusicLockState getLockState(String lockName) { + + byte[] data = zkLockHandle.getNodeData(lockName); + return MusicLockState.deSerialize(data); + } + + public String createLockId(String lockName) { + String lockIdWithSlash = zkLockHandle.createLockId(lockName); + return lockIdWithSlash.replace('/', '$'); + } + + public boolean isMyTurn(String lockIdWithDollar) { + String lockId = lockIdWithDollar.replace('$', '/'); + StringTokenizer st = new StringTokenizer(lockId); + String lockName = "/" + st.nextToken("/"); + try { + return zkLockHandle.lock(lockName, lockId); + } catch (KeeperException | InterruptedException e) { + logger.error(e.getMessage()); + } + return false; + } + + public void unlockAndDeleteId(String lockIdWithDollar) { + String lockId = lockIdWithDollar.replace('$', '/'); + zkLockHandle.unlock(lockId); + } + + public void deleteLock(String lockName) { + zkLockHandle.deleteLock(lockName); + } + + public String whoseTurnIsIt(String lockName) { + String lockHolder = zkLockHandle.currentLockHolder(lockName); + return lockHolder.replace('/', '$'); + + } + + public void process(WatchedEvent event) { // Watcher interface + if (event.getState() == KeeperState.SyncConnected) { + connectedSignal.countDown(); + } + } + + + public void close() { + zkLockHandle.close(); + } + +} diff --git a/src/main/java/org/onap/music/lockingservice/ProtocolSupport.java b/src/main/java/org/onap/music/lockingservice/ProtocolSupport.java new file mode 100644 index 00000000..37ae9e96 --- /dev/null +++ b/src/main/java/org/onap/music/lockingservice/ProtocolSupport.java @@ -0,0 +1,205 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.onap.music.lockingservice.ZooKeeperOperation; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A base class for protocol implementations which provides a number of higher level helper methods + * for working with ZooKeeper along with retrying synchronous operations if the connection to + * ZooKeeper closes such as {@link #retryOperation(ZooKeeperOperation)} + * + */ +class ProtocolSupport { + private static final Logger LOG = LoggerFactory.getLogger(ProtocolSupport.class); + + protected ZooKeeper zookeeper; + private AtomicBoolean closed = new AtomicBoolean(false); + private long retryDelay = 500L; + private int retryCount = 10; + private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + + // public ProtocolSupport(ZooKeeper zookeeper) { + // this.zookeeper = zookeeper; + // } + + /** + * Closes this strategy and releases any ZooKeeper resources; but keeps the ZooKeeper instance + * open + */ + public void close() { + if (closed.compareAndSet(false, true)) { + doClose(); + } + } + + /** + * return zookeeper client instance + * + * @return zookeeper client instance + */ + public ZooKeeper getZookeeper() { + return zookeeper; + } + + /** + * return the acl its using + * + * @return the acl. + */ + public List<ACL> getAcl() { + return acl; + } + + /** + * set the acl + * + * @param acl the acl to set to + */ + public void setAcl(List<ACL> acl) { + this.acl = acl; + } + + /** + * get the retry delay in milliseconds + * + * @return the retry delay + */ + public long getRetryDelay() { + return retryDelay; + } + + /** + * Sets the time waited between retry delays + * + * @param retryDelay the retry delay + */ + public void setRetryDelay(long retryDelay) { + this.retryDelay = retryDelay; + } + + /** + * Allow derived classes to perform some custom closing operations to release resources + */ + protected void doClose() {} + + + /** + * Perform the given operation, retrying if the connection fails + * + * @return object. it needs to be cast to the callee's expected return type. + * @param operation FILL IN + * @throws KeeperException FILL IN + * @throws InterruptedException FILL IN + */ + protected Object retryOperation(ZooKeeperOperation operation) + throws KeeperException, InterruptedException { + KeeperException exception = null; + for (int i = 0; i < retryCount; i++) { + try { + return operation.execute(); + } catch (KeeperException.SessionExpiredException e) { + LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e); + throw e; + } catch (KeeperException.ConnectionLossException e) { + if (exception == null) { + exception = e; + } + LOG.debug("Attempt " + i + " failed with connection loss so " + + "attempting to reconnect: " + e, e); + retryDelay(i); + } + } + throw exception; + } + + /** + * Ensures that the given path exists with no data, the current ACL and no flags + * + * @param path the lock path + */ + protected void ensurePathExists(String path) { + ensureExists(path, null, acl, CreateMode.PERSISTENT); + } + + /** + * Ensures that the given path exists with the given data, ACL and flags + * + * @param path the lock path + * @param data the data + * @param acl list of ACLs applying to the path + * @param flags create mode flags + */ + protected void ensureExists(final String path, final byte[] data, final List<ACL> acl, + final CreateMode flags) { + try { + retryOperation(new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + Stat stat = zookeeper.exists(path, false); + if (stat != null) { + return true; + } + zookeeper.create(path, data, acl, flags); + return true; + } + }); + } catch (KeeperException e) { + LOG.warn("Caught: " + e, e); + } catch (InterruptedException e) { + LOG.warn("Caught: " + e, e); + } + } + + /** + * Returns true if this protocol has been closed + * + * @return true if this protocol is closed + */ + protected boolean isClosed() { + return closed.get(); + } + + /** + * Performs a retry delay if this is not the first attempt + * + * @param attemptCount the number of the attempts performed so far + */ + protected void retryDelay(int attemptCount) { + if (attemptCount > 0) { + try { + Thread.sleep(attemptCount * retryDelay); + } catch (InterruptedException e) { + LOG.debug("Failed to sleep: " + e, e); + } + } + } +} diff --git a/src/main/java/org/onap/music/lockingservice/ZNodeName.java b/src/main/java/org/onap/music/lockingservice/ZNodeName.java new file mode 100644 index 00000000..c8d14cba --- /dev/null +++ b/src/main/java/org/onap/music/lockingservice/ZNodeName.java @@ -0,0 +1,117 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +/** + * Represents an ephemeral znode name which has an ordered sequence number and can be sorted in + * order + * + */ +class ZNodeName implements Comparable<ZNodeName> { + private final String name; + private String prefix; + private int sequence = -1; + private static EELFLogger LOG = EELFManager.getInstance().getLogger(ZNodeName.class); + + public ZNodeName(String name) { + if (name == null) { + throw new NullPointerException("id cannot be null"); + } + this.name = name; + this.prefix = name; + int idx = name.lastIndexOf('-'); + if (idx >= 0) { + this.prefix = name.substring(0, idx); + try { + this.sequence = Integer.parseInt(name.substring(idx + 1)); + // If an exception occurred we misdetected a sequence suffix, + // so return -1. + } catch (NumberFormatException e) { + LOG.info("Number format exception for " + idx, e); + } catch (ArrayIndexOutOfBoundsException e) { + LOG.info("Array out of bounds for " + idx, e); + } + } + } + + @Override + public String toString() { + return name.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ZNodeName sequence = (ZNodeName) o; + + if (!name.equals(sequence.name)) + return false; + + return true; + } + + @Override + public int hashCode() { + return name.hashCode() + 37; + } + + public int compareTo(ZNodeName that) { + int answer = this.prefix.compareTo(that.prefix); + if (answer == 0) { + int s1 = this.sequence; + int s2 = that.sequence; + if (s1 == -1 && s2 == -1) { + return this.name.compareTo(that.name); + } + answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2; + } + return answer; + } + + /** + * Returns the name of the znode + */ + public String getName() { + return name; + } + + /** + * Returns the sequence number + */ + public int getZNodeName() { + return sequence; + } + + /** + * Returns the text prefix before the sequence number + */ + public String getPrefix() { + return prefix; + } +} diff --git a/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java b/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java new file mode 100644 index 00000000..99ccd4db --- /dev/null +++ b/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java @@ -0,0 +1,334 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +/** + * A <a href="package.html">protocol to implement an exclusive write lock or to elect a leader</a>. + * <p/> + * You invoke {@link #lock()} to start the process of grabbing the lock; you may get the lock then + * or it may be some time later. + * <p/> + * You can register a listener so that you are invoked when you get the lock; otherwise you can ask + * if you have the lock by calling {@link #isOwner()} + * + */ +public class ZkStatelessLockService extends ProtocolSupport { + public ZkStatelessLockService(ZooKeeper zk) { + zookeeper = zk; + } + + private static EELFLogger LOG = + EELFManager.getInstance().getLogger(ZkStatelessLockService.class); + + protected void createLock(final String path, final byte[] data) { + final List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + try { + retryOperation(new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + zookeeper.create(path, data, acl, CreateMode.PERSISTENT); + return true; + } + }); + } catch (KeeperException e) { + LOG.info("Caught: " + e, e); + } catch (InterruptedException e) { + LOG.info("Caught: " + e, e); + } + } + + public void close() { + try { + zookeeper.close(); + } catch (InterruptedException e) { + LOG.info(e.getMessage()); + } + } + + public void setNodeData(final String lockName, final byte[] data) { + try { + retryOperation(new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + zookeeper.getSessionId(); + zookeeper.setData("/" + lockName, data, -1); + return true; + } + }); + } catch (KeeperException e) { + LOG.info("Caught: " + e, e); + } catch (InterruptedException e) { + LOG.info("Caught: " + e, e); + } + + } + + public byte[] getNodeData(final String lockName) { + try { + if (zookeeper.exists("/" + lockName, null) != null) + return zookeeper.getData("/" + lockName, false, null); + else + return null; + + } catch (KeeperException e) { + LOG.info("Caught: " + e, e); + } catch (InterruptedException e) { + LOG.info("Caught: " + e, e); + } + return null; + } + + public boolean checkIfLockExists(String lockName) { + boolean result = false; + try { + Stat stat = zookeeper.exists(lockName, false); + if (stat != null) { + result = true; + } + } catch (KeeperException e) { + LOG.info(e.getMessage()); + } catch (InterruptedException e) { + LOG.info(e.getMessage()); + } + return result; + } + + public void createNode(String nodeName) { + ensurePathExists(nodeName); + } + + public String createLockId(String dir) { + ensurePathExists(dir); + LockZooKeeperOperation zop = new LockZooKeeperOperation(dir); + + try { + retryOperation(zop); + } catch (KeeperException e) { + LOG.info(e.getMessage()); + } catch (InterruptedException e) { + LOG.info(e.getMessage()); + } + return zop.getId(); + } + + /** + * Attempts to acquire the exclusive write lock returning whether or not it was acquired. Note + * that the exclusive lock may be acquired some time later after this method has been invoked + * due to the current lock owner going away. + */ + public synchronized boolean lock(String dir, String lockId) + throws KeeperException, InterruptedException { + if (isClosed()) { + return false; + } + LockZooKeeperOperation zop = new LockZooKeeperOperation(dir, lockId); + return (Boolean) retryOperation(zop); + } + + /** + * Removes the lock or associated znode if you no longer require the lock. this also removes + * your request in the queue for locking in case you do not already hold the lock. + * + * @throws RuntimeException throws a runtime exception if it cannot connect to zookeeper. + */ + public synchronized void unlock(String lockId) throws RuntimeException { + final String id = lockId; + if (!isClosed() && id != null) { + try { + ZooKeeperOperation zopdel = new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + zookeeper.delete(id, -1); + return Boolean.TRUE; + } + }; + zopdel.execute(); + } catch (InterruptedException e) { + LOG.info("Caught: " + e, e); + // set that we have been interrupted. + Thread.currentThread().interrupt(); + } catch (KeeperException.NoNodeException e) { + // do nothing + } catch (KeeperException e) { + LOG.info("Caught: " + e, e); + throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e); + } + } + } + + public synchronized String currentLockHolder(String mainLock) { + final String id = mainLock; + if (!isClosed() && id != null) { + List<String> names; + try { + names = zookeeper.getChildren(id, false); + if (names.isEmpty()) + return ""; + SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>(); + for (String name : names) { + sortedNames.add(new ZNodeName(id + "/" + name)); + } + return sortedNames.first().getName(); + } catch (InterruptedException e) { + LOG.info("Caught: " + e, e); + // set that we have been interrupted. + Thread.currentThread().interrupt(); + } catch (KeeperException.NoNodeException e) { + // do nothing + } catch (KeeperException e) { + LOG.info("Caught: " + e, e); + throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e); + } + } + return "No lock holder!"; + } + + public synchronized void deleteLock(String mainLock) { + final String id = mainLock; + if (!isClosed() && id != null) { + try { + ZooKeeperOperation zopdel = new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + List<String> names = zookeeper.getChildren(id, false); + for (String name : names) { + zookeeper.delete(id + "/" + name, -1); + } + zookeeper.delete(id, -1); + return Boolean.TRUE; + } + }; + zopdel.execute(); + } catch (InterruptedException e) { + LOG.info("Caught: " + e, e); + // set that we have been interrupted. + Thread.currentThread().interrupt(); + } catch (KeeperException.NoNodeException e) { + // do nothing + } catch (KeeperException e) { + LOG.info("Caught: " + e, e); + throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e); + } + } + + } + + /** + * a zoookeeper operation that is mainly responsible for all the magic required for locking. + */ + private class LockZooKeeperOperation implements ZooKeeperOperation { + + /** + * find if we have been created earlier if not create our node + * + * @param prefix the prefix node + * @param zookeeper the zookeeper client + * @param dir the dir parent + * @throws KeeperException + * @throws InterruptedException + */ + private String dir; + private String id = null; + + public String getId() { + return id; + } + + public LockZooKeeperOperation(String dir) { + this.dir = dir; + } + + public LockZooKeeperOperation(String dir, String id) { + this.dir = dir; + this.id = id; + } + + /** + * the command that is run and retried for actually obtaining the lock + * + * @return if the command was successful or not + */ + public boolean execute() throws KeeperException, InterruptedException { + do { + if (id == null) { + String prefix = "x-"; + byte[] data = {0x12, 0x34}; + id = zookeeper.create(dir + "/" + prefix, data, getAcl(), + CreateMode.PERSISTENT_SEQUENTIAL); + + if (LOG.isDebugEnabled()) { + LOG.debug("Created id: " + id); + } + if (id != null) + break; + } + if (id != null) { + List<String> names = zookeeper.getChildren(dir, false); + if (names.isEmpty()) { + LOG.info("No children in: " + dir + " when we've just " + + "created one! Lets recreate it..."); + // lets force the recreation of the id + id = null; + } else { + // lets sort them explicitly (though they do seem to come back in order + // ususally :) + ZNodeName idName = new ZNodeName(id); + SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>(); + for (String name : names) { + sortedNames.add(new ZNodeName(dir + "/" + name)); + } + if (!sortedNames.contains(idName)) + return Boolean.FALSE; + + SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName); + if (!lessThanMe.isEmpty()) { + ZNodeName lastChildName = lessThanMe.last(); + String lastChildId = lastChildName.getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("watching less than me node: " + lastChildId); + } + Stat stat = zookeeper.exists(lastChildId, false); + if (stat != null) { + return Boolean.FALSE; + } else { + LOG.info("Could not find the" + " stats for less than me: " + + lastChildName.getName()); + } + } else + return Boolean.TRUE; + } + } + } while (id == null); + return Boolean.FALSE; + } + } + +} + diff --git a/src/main/java/org/onap/music/lockingservice/ZooKeeperOperation.java b/src/main/java/org/onap/music/lockingservice/ZooKeeperOperation.java new file mode 100644 index 00000000..7020d14d --- /dev/null +++ b/src/main/java/org/onap/music/lockingservice/ZooKeeperOperation.java @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + +import org.apache.zookeeper.KeeperException; + +/** + * A callback object which can be used for implementing retry-able operations in the + * {@link org.onap.music.lockingservice.ProtocolSupport} class + * + */ +public interface ZooKeeperOperation { + + /** + * Performs the operation - which may be involved multiple times if the connection + * to ZooKeeper closes during this operation + * + * @return the result of the operation or null + * @throws KeeperException FILL IN + * @throws InterruptedException FILL IN + */ + public boolean execute() throws KeeperException, InterruptedException; +} |