diff options
Diffstat (limited to 'jar/src/main/java/org/onap/music/lockingservice')
7 files changed, 1049 insertions, 0 deletions
diff --git a/jar/src/main/java/org/onap/music/lockingservice/LockListener.java b/jar/src/main/java/org/onap/music/lockingservice/LockListener.java new file mode 100644 index 00000000..33188e60 --- /dev/null +++ b/jar/src/main/java/org/onap/music/lockingservice/LockListener.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + +/** + * This class has two methods which are call back methods when a lock is acquired and when the lock + * is released. + * + */ +public interface LockListener { + /** + * call back called when the lock is acquired + */ + public void lockAcquired(); + + /** + * call back called when the lock is released. + */ + public void lockReleased(); +} diff --git a/jar/src/main/java/org/onap/music/lockingservice/MusicLockState.java b/jar/src/main/java/org/onap/music/lockingservice/MusicLockState.java new file mode 100644 index 00000000..6c31410f --- /dev/null +++ b/jar/src/main/java/org/onap/music/lockingservice/MusicLockState.java @@ -0,0 +1,137 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import org.onap.music.eelf.logging.EELFLoggerDelegate; +import org.onap.music.eelf.logging.format.AppMessages; +import org.onap.music.eelf.logging.format.ErrorSeverity; +import org.onap.music.eelf.logging.format.ErrorTypes; + +// the state variable that will be stored in zookeeper, capturing the transitions of +public class MusicLockState implements Serializable { + public enum LockStatus { + UNLOCKED, BEING_LOCKED, LOCKED + };// captures the state of the lock + + private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicLockState.class); + LockStatus lockStatus; + boolean needToSyncQuorum = false; + String lockHolder; + long leasePeriod = Long.MAX_VALUE, leaseStartTime = -1; + + private String errorMessage = null; + + public MusicLockState(String errorMessage) { + this.errorMessage = errorMessage; + } + + public MusicLockState(LockStatus lockStatus, String lockHolder) { + this.lockStatus = lockStatus; + this.lockHolder = lockHolder; + } + + public MusicLockState(LockStatus lockStatus, String lockHolder, boolean needToSyncQuorum) { + this.lockStatus = lockStatus; + this.lockHolder = lockHolder; + this.needToSyncQuorum = needToSyncQuorum; + } + + + public long getLeasePeriod() { + return leasePeriod; + } + + public boolean isNeedToSyncQuorum() { + return needToSyncQuorum; + } + + + + public void setLeasePeriod(long leasePeriod) { + this.leasePeriod = leasePeriod; + } + + + public long getLeaseStartTime() { + return leaseStartTime; + } + + + public void setLeaseStartTime(long leaseStartTime) { + this.leaseStartTime = leaseStartTime; + } + + + + public LockStatus getLockStatus() { + return lockStatus; + } + + public void setLockStatus(LockStatus lockStatus) { + this.lockStatus = lockStatus; + } + + public String getLockHolder() { + return lockHolder; + } + + public void setLockHolder(String lockHolder) { + this.lockHolder = lockHolder; + } + + public String getErrorMessage() { + return errorMessage; + } + + public byte[] serialize() { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = null; + try { + out = new ObjectOutputStream(bos); + out.writeObject(this); + } catch (IOException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.IOERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR); + } + return bos.toByteArray(); + } + + public static MusicLockState deSerialize(byte[] data) { + ByteArrayInputStream bis = new ByteArrayInputStream(data); + Object o = null; + ObjectInput in = null; + try { + in = new ObjectInputStream(bis); + o = in.readObject(); + } catch (ClassNotFoundException | IOException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.ERROR, ErrorTypes.UNKNOWN); + } + return (MusicLockState) o; + } +} diff --git a/jar/src/main/java/org/onap/music/lockingservice/MusicLockingService.java b/jar/src/main/java/org/onap/music/lockingservice/MusicLockingService.java new file mode 100644 index 00000000..ae026903 --- /dev/null +++ b/jar/src/main/java/org/onap/music/lockingservice/MusicLockingService.java @@ -0,0 +1,166 @@ +/* + * ============LICENSE_START========================================== org.onap.music + * =================================================================== Copyright (c) 2017 AT&T + * Intellectual Property =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + + +import java.io.IOException; +import java.util.StringTokenizer; +import java.util.concurrent.CountDownLatch; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.onap.music.eelf.logging.EELFLoggerDelegate; +import org.onap.music.eelf.logging.format.AppMessages; +import org.onap.music.eelf.logging.format.ErrorSeverity; +import org.onap.music.eelf.logging.format.ErrorTypes; +import org.onap.music.exceptions.MusicLockingException; +import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.main.MusicUtil; + + +public class MusicLockingService implements Watcher { + + + private static final int SESSION_TIMEOUT = 180000; + ZkStatelessLockService zkLockHandle = null; + private CountDownLatch connectedSignal = new CountDownLatch(1); + private static EELFLoggerDelegate logger = + EELFLoggerDelegate.getLogger(MusicLockingService.class); + + public MusicLockingService() throws MusicServiceException { + try { + ZooKeeper zk = new ZooKeeper(MusicUtil.getMyZkHost(), SESSION_TIMEOUT, this); + connectedSignal.await(); + zkLockHandle = new ZkStatelessLockService(zk); + } catch (IOException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.IOERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR); + throw new MusicServiceException("IO Error has occured" + e.getMessage()); + } catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + throw new MusicServiceException("Exception Occured " + e.getMessage()); + } + } + + public ZkStatelessLockService getzkLockHandle() { + return zkLockHandle; + } + + public MusicLockingService(String lockServer) { + try { + ZooKeeper zk = new ZooKeeper(lockServer, SESSION_TIMEOUT, this); + connectedSignal.await(); + zkLockHandle = new ZkStatelessLockService(zk); + } catch (IOException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.IOERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR); + }catch( InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + }catch(Exception e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + } + } + + public void createLockaIfItDoesNotExist(String lockName) { + if (!zkLockHandle.checkIfLockExists(lockName)) { + String lockHolder = null; + MusicLockState ml = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder); + byte[] data = ml.serialize(); + zkLockHandle.createLock(lockName, data); + } + } + + public void setLockState(String lockName, MusicLockState mls) { + byte[] data = mls.serialize(); + zkLockHandle.setNodeData(lockName, data); + } + + public MusicLockState getLockState(String lockName) throws MusicLockingException { + + byte[] data = null; + try{ + data = zkLockHandle.getNodeData(lockName); + }catch (Exception ex){ + logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + } + if(data !=null) + return MusicLockState.deSerialize(data); + else { + logger.error(EELFLoggerDelegate.errorLogger,"",AppMessages.INVALIDLOCK, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + throw new MusicLockingException("Invalid lock or acquire failed"); + } + } + + public String createLockId(String lockName) { + String lockIdWithSlash = zkLockHandle.createLockId(lockName); + return lockIdWithSlash.replace('/', '$'); + } + + public boolean isMyTurn(String lockIdWithDollar) { + String lockId = lockIdWithDollar.replace('$', '/'); + StringTokenizer st = new StringTokenizer(lockId); + String lockName = "/" + st.nextToken("/"); + try { + return zkLockHandle.lock(lockName, lockId); + } catch (KeeperException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.LOCKINGERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + }catch( InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + }catch(Exception e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + } + return false; + } + + public void unlockAndDeleteId(String lockIdWithDollar) throws KeeperException.NoNodeException { + String lockId = lockIdWithDollar.replace('$', '/'); + zkLockHandle.unlock(lockId); + } + + public void deleteLock(String lockName) throws MusicLockingException { + if(lockIdExists(lockName)) + zkLockHandle.deleteLock(lockName); + else{ + throw new MusicLockingException("Lock does not exist.Please check the lock: " + lockName + " and try again"); + } + } + + public String whoseTurnIsIt(String lockName) { + String lockHolder = zkLockHandle.currentLockHolder(lockName); + return lockHolder.replace('/', '$'); + + } + + public void process(WatchedEvent event) { // Watcher interface + if (event.getState() == KeeperState.SyncConnected) { + connectedSignal.countDown(); + } + } + + + public void close() { + zkLockHandle.close(); + } + + public boolean lockIdExists(String lockIdWithDollar) { + String lockId = lockIdWithDollar.replace('$', '/'); + return zkLockHandle.checkIfLockExists(lockId); + } + +} diff --git a/jar/src/main/java/org/onap/music/lockingservice/ProtocolSupport.java b/jar/src/main/java/org/onap/music/lockingservice/ProtocolSupport.java new file mode 100644 index 00000000..4082b3b8 --- /dev/null +++ b/jar/src/main/java/org/onap/music/lockingservice/ProtocolSupport.java @@ -0,0 +1,208 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.onap.music.eelf.logging.EELFLoggerDelegate; +import org.onap.music.eelf.logging.format.AppMessages; +import org.onap.music.eelf.logging.format.ErrorSeverity; +import org.onap.music.eelf.logging.format.ErrorTypes; +import org.onap.music.lockingservice.ZooKeeperOperation; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A base class for protocol implementations which provides a number of higher level helper methods + * for working with ZooKeeper along with retrying synchronous operations if the connection to + * ZooKeeper closes such as {@link #retryOperation(ZooKeeperOperation)} + * + */ +class ProtocolSupport { + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(ProtocolSupport.class); + + protected ZooKeeper zookeeper; + private AtomicBoolean closed = new AtomicBoolean(false); + private long retryDelay = 500L; + private int retryCount = 10; + private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + + /** + * Closes this strategy and releases any ZooKeeper resources; but keeps the ZooKeeper instance + * open + */ + public void close() { + if (closed.compareAndSet(false, true)) { + doClose(); + } + } + + /** + * return zookeeper client instance + * + * @return zookeeper client instance + */ + public ZooKeeper getZookeeper() { + return zookeeper; + } + + /** + * return the acl its using + * + * @return the acl. + */ + public List<ACL> getAcl() { + return acl; + } + + /** + * set the acl + * + * @param acl the acl to set to + */ + public void setAcl(List<ACL> acl) { + this.acl = acl; + } + + /** + * get the retry delay in milliseconds + * + * @return the retry delay + */ + public long getRetryDelay() { + return retryDelay; + } + + /** + * Sets the time waited between retry delays + * + * @param retryDelay the retry delay + */ + public void setRetryDelay(long retryDelay) { + this.retryDelay = retryDelay; + } + + /** + * Allow derived classes to perform some custom closing operations to release resources + */ + protected void doClose() { + throw new UnsupportedOperationException(); + } + + + /** + * Perform the given operation, retrying if the connection fails + * + * @return object. it needs to be cast to the callee's expected return type. + * @param operation FILL IN + * @throws KeeperException FILL IN + * @throws InterruptedException FILL IN + */ + protected Object retryOperation(ZooKeeperOperation operation) + throws KeeperException, InterruptedException { + KeeperException exception = null; + for (int i = 0; i < retryCount; i++) { + try { + return operation.execute(); + } catch (KeeperException.SessionExpiredException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONEXPIRED+" for: " + zookeeper + " so reconnecting due to: " + e, ErrorSeverity.ERROR, ErrorTypes.SESSIONEXPIRED); + throw e; + } catch (KeeperException.ConnectionLossException e) { + if (exception == null) { + exception = e; + } + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.SESSIONEXPIRED); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),"Attempt " + i + " failed with connection loss so attempting to reconnect: " + e); + + retryDelay(i); + } + } + throw exception; + } + + /** + * Ensures that the given path exists with no data, the current ACL and no flags + * + * @param path the lock path + */ + protected void ensurePathExists(String path) { + ensureExists(path, null, acl, CreateMode.PERSISTENT); + } + + /** + * Ensures that the given path exists with the given data, ACL and flags + * + * @param path the lock path + * @param data the data + * @param acl list of ACLs applying to the path + * @param flags create mode flags + */ + protected void ensureExists(final String path, final byte[] data, final List<ACL> acl, + final CreateMode flags) { + try { + retryOperation(new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + Stat stat = zookeeper.exists(path, false); + if (stat != null) { + return true; + } + zookeeper.create(path, data, acl, flags); + return true; + } + }); + } catch (KeeperException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + } catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + } + } + + /** + * Returns true if this protocol has been closed + * + * @return true if this protocol is closed + */ + protected boolean isClosed() { + return closed.get(); + } + + /** + * Performs a retry delay if this is not the first attempt + * + * @param attemptCount the number of the attempts performed so far + */ + protected void retryDelay(int attemptCount) { + if (attemptCount > 0) { + try { + Thread.sleep(attemptCount * retryDelay); + } catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),"Thread failed to sleep: " + e); + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/jar/src/main/java/org/onap/music/lockingservice/ZNodeName.java b/jar/src/main/java/org/onap/music/lockingservice/ZNodeName.java new file mode 100644 index 00000000..0c190f14 --- /dev/null +++ b/jar/src/main/java/org/onap/music/lockingservice/ZNodeName.java @@ -0,0 +1,118 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + +import org.onap.music.eelf.logging.EELFLoggerDelegate; +import org.onap.music.eelf.logging.format.ErrorSeverity; +import org.onap.music.eelf.logging.format.ErrorTypes; + +/** + * Represents an ephemeral znode name which has an ordered sequence number and can be sorted in + * order + * + */ +class ZNodeName implements Comparable<ZNodeName> { + private final String name; + private String prefix; + private int sequence = -1; + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(ZNodeName.class); + + public ZNodeName(String name) { + if (name == null) { + throw new NullPointerException("id cannot be null"); + } + this.name = name; + this.prefix = name; + int idx = name.lastIndexOf('-'); + if (idx >= 0) { + this.prefix = name.substring(0, idx); + try { + this.sequence = Integer.parseInt(name.substring(idx + 1)); + // If an exception occurred we misdetected a sequence suffix, + // so return -1. + } catch (NumberFormatException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),"Number format exception "+idx, ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR); + } catch (ArrayIndexOutOfBoundsException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),"Array out of bounds for "+idx, ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR); + } + } + } + + @Override + public String toString() { + return name.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ZNodeName sequence = (ZNodeName) o; + + if (!name.equals(sequence.name)) + return false; + + return true; + } + + @Override + public int hashCode() { + return name.hashCode() + 37; + } + + public int compareTo(ZNodeName that) { + int answer = this.prefix.compareTo(that.prefix); + if (answer == 0) { + int s1 = this.sequence; + int s2 = that.sequence; + if (s1 == -1 && s2 == -1) { + return this.name.compareTo(that.name); + } + answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2; + } + return answer; + } + + /** + * Returns the name of the znode + */ + public String getName() { + return name; + } + + /** + * Returns the sequence number + */ + public int getZNodeName() { + return sequence; + } + + /** + * Returns the text prefix before the sequence number + */ + public String getPrefix() { + return prefix; + } +} diff --git a/jar/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java b/jar/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java new file mode 100644 index 00000000..e99df255 --- /dev/null +++ b/jar/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java @@ -0,0 +1,339 @@ +/* + * ============LICENSE_START========================================== org.onap.music + * =================================================================== Copyright (c) 2017 AT&T + * Intellectual Property =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + + +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.onap.music.eelf.logging.EELFLoggerDelegate; +import org.onap.music.eelf.logging.format.AppMessages; +import org.onap.music.eelf.logging.format.ErrorSeverity; +import org.onap.music.eelf.logging.format.ErrorTypes; + +/** + * A <a href="package.html">protocol to implement an exclusive write lock or to elect a leader</a>. + * <p/> + * You invoke {@link #lock()} to start the process of grabbing the lock; you may get the lock then + * or it may be some time later. + * <p/> + * You can register a listener so that you are invoked when you get the lock; otherwise you can ask + * if you have the lock by calling {@link #isOwner()} + * + */ +public class ZkStatelessLockService extends ProtocolSupport { + public ZkStatelessLockService(ZooKeeper zk) { + zookeeper = zk; + } + + private static EELFLoggerDelegate logger = + EELFLoggerDelegate.getLogger(ZkStatelessLockService.class); + + protected void createLock(final String path, final byte[] data) { + final List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + try { + retryOperation(new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + zookeeper.create(path, data, acl, CreateMode.PERSISTENT); + return true; + } + }); + }catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + }catch (KeeperException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + } + } + + public void close() { + try { + zookeeper.close(); + }catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + } + } + + public void setNodeData(final String lockName, final byte[] data) { + try { + retryOperation(new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + zookeeper.getSessionId(); + zookeeper.setData("/" + lockName, data, -1); + return true; + } + }); + }catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + }catch (KeeperException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + } + + } + + public byte[] getNodeData(final String lockName) { + try { + if (zookeeper.exists("/" + lockName, null) != null) + return zookeeper.getData("/" + lockName, false, null); + else + return null; + + }catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + }catch (KeeperException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + } + return null; + } + + public boolean checkIfLockExists(String lockName) { + boolean result = false; + try { + Stat stat = zookeeper.exists(lockName, false); + if (stat != null) { + result = true; + } + }catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + }catch (KeeperException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + } + return result; + } + + public void createNode(String nodeName) { + ensurePathExists(nodeName); + } + + public String createLockId(String dir) { + ensurePathExists(dir); + LockZooKeeperOperation zop = new LockZooKeeperOperation(dir); + try { + retryOperation(zop); + }catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + }catch (KeeperException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + } + return zop.getId(); + } + + /** + * Attempts to acquire the exclusive write lock returning whether or not it was acquired. Note + * that the exclusive lock may be acquired some time later after this method has been invoked + * due to the current lock owner going away. + */ + public synchronized boolean lock(String dir, String lockId) + throws KeeperException, InterruptedException { + if (isClosed()) { + return false; + } + LockZooKeeperOperation zop = new LockZooKeeperOperation(dir, lockId); + return (Boolean) retryOperation(zop); + } + + /** + * Removes the lock or associated znode if you no longer require the lock. this also removes + * your request in the queue for locking in case you do not already hold the lock. + * + * @throws RuntimeException throws a runtime exception if it cannot connect to zookeeper. + * @throws NoNodeException + */ + public synchronized void unlock(String lockId) throws RuntimeException, KeeperException.NoNodeException { + final String id = lockId; + if (!isClosed() && id != null) { + try { + ZooKeeperOperation zopdel = new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + zookeeper.delete(id, -1); + return Boolean.TRUE; + } + }; + zopdel.execute(); + } catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + // set that we have been interrupted. + Thread.currentThread().interrupt(); + } catch (KeeperException.NoNodeException e) { + // do nothing + throw new KeeperException.NoNodeException("Lock doesn't exists. Release lock operation failed."); + } catch (KeeperException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e); + } + } + } + + public synchronized String currentLockHolder(String mainLock) { + final String id = mainLock; + if (!isClosed() && id != null) { + List<String> names; + try { + names = zookeeper.getChildren(id, false); + if (names.isEmpty()) + return ""; + SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>(); + for (String name : names) { + sortedNames.add(new ZNodeName(id + "/" + name)); + } + return sortedNames.first().getName(); + } catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + // set that we have been interrupted. + Thread.currentThread().interrupt(); + } catch (KeeperException.NoNodeException e) { + // do nothing + } catch (KeeperException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e); + } + } + return "No lock holder!"; + } + + public synchronized void deleteLock(String mainLock) { + final String id = mainLock; + if (!isClosed() && id != null) { + try { + ZooKeeperOperation zopdel = new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + List<String> names = zookeeper.getChildren(id, false); + for (String name : names) { + zookeeper.delete(id + "/" + name, -1); + } + zookeeper.delete(id, -1); + return Boolean.TRUE; + } + }; + zopdel.execute(); + } catch (InterruptedException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + // set that we have been interrupted. + Thread.currentThread().interrupt(); + } catch (KeeperException.NoNodeException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + // do nothing + } catch (KeeperException e) { + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR); + throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e); + } + } + + } + + /** + * a zoookeeper operation that is mainly responsible for all the magic required for locking. + */ + private class LockZooKeeperOperation implements ZooKeeperOperation { + + /** + * find if we have been created earlier if not create our node + * + * @param prefix the prefix node + * @param zookeeper the zookeeper client + * @param dir the dir parent + * @throws KeeperException + * @throws InterruptedException + */ + private String dir; + private String id = null; + + public String getId() { + return id; + } + + public LockZooKeeperOperation(String dir) { + this.dir = dir; + } + + public LockZooKeeperOperation(String dir, String id) { + this.dir = dir; + this.id = id; + } + + /** + * the command that is run and retried for actually obtaining the lock + * + * @return if the command was successful or not + */ + public boolean execute() throws KeeperException, InterruptedException { + do { + if (id == null) { + String prefix = "x-"; + byte[] data = {0x12, 0x34}; + id = zookeeper.create(dir + "/" + prefix, data, getAcl(), + CreateMode.PERSISTENT_SEQUENTIAL); + + if (logger.isDebugEnabled()) { + logger.debug(EELFLoggerDelegate.debugLogger, "Created id: " + id); + } + if (id != null) + break; + } + if (id != null) { + List<String> names = zookeeper.getChildren(dir, false); + if (names.isEmpty()) { + logger.info(EELFLoggerDelegate.applicationLogger, "No children in: " + dir + + " when we've just " + "created one! Lets recreate it..."); + // lets force the recreation of the id + id = null; + return Boolean.FALSE; + + } else { + // lets sort them explicitly (though they do seem to come back in order + // ususally :) + ZNodeName idName = new ZNodeName(id); + SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>(); + for (String name : names) { + sortedNames.add(new ZNodeName(dir + "/" + name)); + } + if (!sortedNames.contains(idName)) + return Boolean.FALSE; + + SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName); + if (!lessThanMe.isEmpty()) { + ZNodeName lastChildName = lessThanMe.last(); + String lastChildId = lastChildName.getName(); + if (logger.isDebugEnabled()) { + logger.debug(EELFLoggerDelegate.debugLogger, "watching less than me node: " + lastChildId); + } + Stat stat = zookeeper.exists(lastChildId, false); + if (stat != null) { + return Boolean.FALSE; + } else { + logger.info(EELFLoggerDelegate.applicationLogger, + "Could not find the" + " stats for less than me: " + + lastChildName.getName()); + } + } else + return Boolean.TRUE; + } + } + } while (id == null); + return Boolean.FALSE; + } + } + +} + diff --git a/jar/src/main/java/org/onap/music/lockingservice/ZooKeeperOperation.java b/jar/src/main/java/org/onap/music/lockingservice/ZooKeeperOperation.java new file mode 100644 index 00000000..7020d14d --- /dev/null +++ b/jar/src/main/java/org/onap/music/lockingservice/ZooKeeperOperation.java @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.lockingservice; + +import org.apache.zookeeper.KeeperException; + +/** + * A callback object which can be used for implementing retry-able operations in the + * {@link org.onap.music.lockingservice.ProtocolSupport} class + * + */ +public interface ZooKeeperOperation { + + /** + * Performs the operation - which may be involved multiple times if the connection + * to ZooKeeper closes during this operation + * + * @return the result of the operation or null + * @throws KeeperException FILL IN + * @throws InterruptedException FILL IN + */ + public boolean execute() throws KeeperException, InterruptedException; +} |