aboutsummaryrefslogtreecommitdiffstats
path: root/jar/src/main/java/org/onap/music/lockingservice
diff options
context:
space:
mode:
Diffstat (limited to 'jar/src/main/java/org/onap/music/lockingservice')
-rw-r--r--jar/src/main/java/org/onap/music/lockingservice/LockListener.java39
-rw-r--r--jar/src/main/java/org/onap/music/lockingservice/MusicLockState.java137
-rw-r--r--jar/src/main/java/org/onap/music/lockingservice/MusicLockingService.java166
-rw-r--r--jar/src/main/java/org/onap/music/lockingservice/ProtocolSupport.java208
-rw-r--r--jar/src/main/java/org/onap/music/lockingservice/ZNodeName.java118
-rw-r--r--jar/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java339
-rw-r--r--jar/src/main/java/org/onap/music/lockingservice/ZooKeeperOperation.java42
7 files changed, 1049 insertions, 0 deletions
diff --git a/jar/src/main/java/org/onap/music/lockingservice/LockListener.java b/jar/src/main/java/org/onap/music/lockingservice/LockListener.java
new file mode 100644
index 00000000..33188e60
--- /dev/null
+++ b/jar/src/main/java/org/onap/music/lockingservice/LockListener.java
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2017 AT&T Intellectual Property
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+package org.onap.music.lockingservice;
+
+/**
+ * This class has two methods which are call back methods when a lock is acquired and when the lock
+ * is released.
+ *
+ */
+public interface LockListener {
+ /**
+ * call back called when the lock is acquired
+ */
+ public void lockAcquired();
+
+ /**
+ * call back called when the lock is released.
+ */
+ public void lockReleased();
+}
diff --git a/jar/src/main/java/org/onap/music/lockingservice/MusicLockState.java b/jar/src/main/java/org/onap/music/lockingservice/MusicLockState.java
new file mode 100644
index 00000000..6c31410f
--- /dev/null
+++ b/jar/src/main/java/org/onap/music/lockingservice/MusicLockState.java
@@ -0,0 +1,137 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2017 AT&T Intellectual Property
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+package org.onap.music.lockingservice;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import org.onap.music.eelf.logging.EELFLoggerDelegate;
+import org.onap.music.eelf.logging.format.AppMessages;
+import org.onap.music.eelf.logging.format.ErrorSeverity;
+import org.onap.music.eelf.logging.format.ErrorTypes;
+
+// the state variable that will be stored in zookeeper, capturing the transitions of
+public class MusicLockState implements Serializable {
+ public enum LockStatus {
+ UNLOCKED, BEING_LOCKED, LOCKED
+ };// captures the state of the lock
+
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicLockState.class);
+ LockStatus lockStatus;
+ boolean needToSyncQuorum = false;
+ String lockHolder;
+ long leasePeriod = Long.MAX_VALUE, leaseStartTime = -1;
+
+ private String errorMessage = null;
+
+ public MusicLockState(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ public MusicLockState(LockStatus lockStatus, String lockHolder) {
+ this.lockStatus = lockStatus;
+ this.lockHolder = lockHolder;
+ }
+
+ public MusicLockState(LockStatus lockStatus, String lockHolder, boolean needToSyncQuorum) {
+ this.lockStatus = lockStatus;
+ this.lockHolder = lockHolder;
+ this.needToSyncQuorum = needToSyncQuorum;
+ }
+
+
+ public long getLeasePeriod() {
+ return leasePeriod;
+ }
+
+ public boolean isNeedToSyncQuorum() {
+ return needToSyncQuorum;
+ }
+
+
+
+ public void setLeasePeriod(long leasePeriod) {
+ this.leasePeriod = leasePeriod;
+ }
+
+
+ public long getLeaseStartTime() {
+ return leaseStartTime;
+ }
+
+
+ public void setLeaseStartTime(long leaseStartTime) {
+ this.leaseStartTime = leaseStartTime;
+ }
+
+
+
+ public LockStatus getLockStatus() {
+ return lockStatus;
+ }
+
+ public void setLockStatus(LockStatus lockStatus) {
+ this.lockStatus = lockStatus;
+ }
+
+ public String getLockHolder() {
+ return lockHolder;
+ }
+
+ public void setLockHolder(String lockHolder) {
+ this.lockHolder = lockHolder;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public byte[] serialize() {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = null;
+ try {
+ out = new ObjectOutputStream(bos);
+ out.writeObject(this);
+ } catch (IOException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.IOERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
+ }
+ return bos.toByteArray();
+ }
+
+ public static MusicLockState deSerialize(byte[] data) {
+ ByteArrayInputStream bis = new ByteArrayInputStream(data);
+ Object o = null;
+ ObjectInput in = null;
+ try {
+ in = new ObjectInputStream(bis);
+ o = in.readObject();
+ } catch (ClassNotFoundException | IOException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.ERROR, ErrorTypes.UNKNOWN);
+ }
+ return (MusicLockState) o;
+ }
+}
diff --git a/jar/src/main/java/org/onap/music/lockingservice/MusicLockingService.java b/jar/src/main/java/org/onap/music/lockingservice/MusicLockingService.java
new file mode 100644
index 00000000..ae026903
--- /dev/null
+++ b/jar/src/main/java/org/onap/music/lockingservice/MusicLockingService.java
@@ -0,0 +1,166 @@
+/*
+ * ============LICENSE_START========================================== org.onap.music
+ * =================================================================== Copyright (c) 2017 AT&T
+ * Intellectual Property ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+package org.onap.music.lockingservice;
+
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.onap.music.eelf.logging.EELFLoggerDelegate;
+import org.onap.music.eelf.logging.format.AppMessages;
+import org.onap.music.eelf.logging.format.ErrorSeverity;
+import org.onap.music.eelf.logging.format.ErrorTypes;
+import org.onap.music.exceptions.MusicLockingException;
+import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.main.MusicUtil;
+
+
+public class MusicLockingService implements Watcher {
+
+
+ private static final int SESSION_TIMEOUT = 180000;
+ ZkStatelessLockService zkLockHandle = null;
+ private CountDownLatch connectedSignal = new CountDownLatch(1);
+ private static EELFLoggerDelegate logger =
+ EELFLoggerDelegate.getLogger(MusicLockingService.class);
+
+ public MusicLockingService() throws MusicServiceException {
+ try {
+ ZooKeeper zk = new ZooKeeper(MusicUtil.getMyZkHost(), SESSION_TIMEOUT, this);
+ connectedSignal.await();
+ zkLockHandle = new ZkStatelessLockService(zk);
+ } catch (IOException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.IOERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
+ throw new MusicServiceException("IO Error has occured" + e.getMessage());
+ } catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ throw new MusicServiceException("Exception Occured " + e.getMessage());
+ }
+ }
+
+ public ZkStatelessLockService getzkLockHandle() {
+ return zkLockHandle;
+ }
+
+ public MusicLockingService(String lockServer) {
+ try {
+ ZooKeeper zk = new ZooKeeper(lockServer, SESSION_TIMEOUT, this);
+ connectedSignal.await();
+ zkLockHandle = new ZkStatelessLockService(zk);
+ } catch (IOException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.IOERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
+ }catch( InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }catch(Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }
+ }
+
+ public void createLockaIfItDoesNotExist(String lockName) {
+ if (!zkLockHandle.checkIfLockExists(lockName)) {
+ String lockHolder = null;
+ MusicLockState ml = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder);
+ byte[] data = ml.serialize();
+ zkLockHandle.createLock(lockName, data);
+ }
+ }
+
+ public void setLockState(String lockName, MusicLockState mls) {
+ byte[] data = mls.serialize();
+ zkLockHandle.setNodeData(lockName, data);
+ }
+
+ public MusicLockState getLockState(String lockName) throws MusicLockingException {
+
+ byte[] data = null;
+ try{
+ data = zkLockHandle.getNodeData(lockName);
+ }catch (Exception ex){
+ logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }
+ if(data !=null)
+ return MusicLockState.deSerialize(data);
+ else {
+ logger.error(EELFLoggerDelegate.errorLogger,"",AppMessages.INVALIDLOCK, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ throw new MusicLockingException("Invalid lock or acquire failed");
+ }
+ }
+
+ public String createLockId(String lockName) {
+ String lockIdWithSlash = zkLockHandle.createLockId(lockName);
+ return lockIdWithSlash.replace('/', '$');
+ }
+
+ public boolean isMyTurn(String lockIdWithDollar) {
+ String lockId = lockIdWithDollar.replace('$', '/');
+ StringTokenizer st = new StringTokenizer(lockId);
+ String lockName = "/" + st.nextToken("/");
+ try {
+ return zkLockHandle.lock(lockName, lockId);
+ } catch (KeeperException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.LOCKINGERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }catch( InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }catch(Exception e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }
+ return false;
+ }
+
+ public void unlockAndDeleteId(String lockIdWithDollar) throws KeeperException.NoNodeException {
+ String lockId = lockIdWithDollar.replace('$', '/');
+ zkLockHandle.unlock(lockId);
+ }
+
+ public void deleteLock(String lockName) throws MusicLockingException {
+ if(lockIdExists(lockName))
+ zkLockHandle.deleteLock(lockName);
+ else{
+ throw new MusicLockingException("Lock does not exist.Please check the lock: " + lockName + " and try again");
+ }
+ }
+
+ public String whoseTurnIsIt(String lockName) {
+ String lockHolder = zkLockHandle.currentLockHolder(lockName);
+ return lockHolder.replace('/', '$');
+
+ }
+
+ public void process(WatchedEvent event) { // Watcher interface
+ if (event.getState() == KeeperState.SyncConnected) {
+ connectedSignal.countDown();
+ }
+ }
+
+
+ public void close() {
+ zkLockHandle.close();
+ }
+
+ public boolean lockIdExists(String lockIdWithDollar) {
+ String lockId = lockIdWithDollar.replace('$', '/');
+ return zkLockHandle.checkIfLockExists(lockId);
+ }
+
+}
diff --git a/jar/src/main/java/org/onap/music/lockingservice/ProtocolSupport.java b/jar/src/main/java/org/onap/music/lockingservice/ProtocolSupport.java
new file mode 100644
index 00000000..4082b3b8
--- /dev/null
+++ b/jar/src/main/java/org/onap/music/lockingservice/ProtocolSupport.java
@@ -0,0 +1,208 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2017 AT&T Intellectual Property
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+package org.onap.music.lockingservice;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.onap.music.eelf.logging.EELFLoggerDelegate;
+import org.onap.music.eelf.logging.format.AppMessages;
+import org.onap.music.eelf.logging.format.ErrorSeverity;
+import org.onap.music.eelf.logging.format.ErrorTypes;
+import org.onap.music.lockingservice.ZooKeeperOperation;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A base class for protocol implementations which provides a number of higher level helper methods
+ * for working with ZooKeeper along with retrying synchronous operations if the connection to
+ * ZooKeeper closes such as {@link #retryOperation(ZooKeeperOperation)}
+ *
+ */
+class ProtocolSupport {
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(ProtocolSupport.class);
+
+ protected ZooKeeper zookeeper;
+ private AtomicBoolean closed = new AtomicBoolean(false);
+ private long retryDelay = 500L;
+ private int retryCount = 10;
+ private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+ /**
+ * Closes this strategy and releases any ZooKeeper resources; but keeps the ZooKeeper instance
+ * open
+ */
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ doClose();
+ }
+ }
+
+ /**
+ * return zookeeper client instance
+ *
+ * @return zookeeper client instance
+ */
+ public ZooKeeper getZookeeper() {
+ return zookeeper;
+ }
+
+ /**
+ * return the acl its using
+ *
+ * @return the acl.
+ */
+ public List<ACL> getAcl() {
+ return acl;
+ }
+
+ /**
+ * set the acl
+ *
+ * @param acl the acl to set to
+ */
+ public void setAcl(List<ACL> acl) {
+ this.acl = acl;
+ }
+
+ /**
+ * get the retry delay in milliseconds
+ *
+ * @return the retry delay
+ */
+ public long getRetryDelay() {
+ return retryDelay;
+ }
+
+ /**
+ * Sets the time waited between retry delays
+ *
+ * @param retryDelay the retry delay
+ */
+ public void setRetryDelay(long retryDelay) {
+ this.retryDelay = retryDelay;
+ }
+
+ /**
+ * Allow derived classes to perform some custom closing operations to release resources
+ */
+ protected void doClose() {
+ throw new UnsupportedOperationException();
+ }
+
+
+ /**
+ * Perform the given operation, retrying if the connection fails
+ *
+ * @return object. it needs to be cast to the callee's expected return type.
+ * @param operation FILL IN
+ * @throws KeeperException FILL IN
+ * @throws InterruptedException FILL IN
+ */
+ protected Object retryOperation(ZooKeeperOperation operation)
+ throws KeeperException, InterruptedException {
+ KeeperException exception = null;
+ for (int i = 0; i < retryCount; i++) {
+ try {
+ return operation.execute();
+ } catch (KeeperException.SessionExpiredException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONEXPIRED+" for: " + zookeeper + " so reconnecting due to: " + e, ErrorSeverity.ERROR, ErrorTypes.SESSIONEXPIRED);
+ throw e;
+ } catch (KeeperException.ConnectionLossException e) {
+ if (exception == null) {
+ exception = e;
+ }
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.SESSIONEXPIRED);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),"Attempt " + i + " failed with connection loss so attempting to reconnect: " + e);
+
+ retryDelay(i);
+ }
+ }
+ throw exception;
+ }
+
+ /**
+ * Ensures that the given path exists with no data, the current ACL and no flags
+ *
+ * @param path the lock path
+ */
+ protected void ensurePathExists(String path) {
+ ensureExists(path, null, acl, CreateMode.PERSISTENT);
+ }
+
+ /**
+ * Ensures that the given path exists with the given data, ACL and flags
+ *
+ * @param path the lock path
+ * @param data the data
+ * @param acl list of ACLs applying to the path
+ * @param flags create mode flags
+ */
+ protected void ensureExists(final String path, final byte[] data, final List<ACL> acl,
+ final CreateMode flags) {
+ try {
+ retryOperation(new ZooKeeperOperation() {
+ public boolean execute() throws KeeperException, InterruptedException {
+ Stat stat = zookeeper.exists(path, false);
+ if (stat != null) {
+ return true;
+ }
+ zookeeper.create(path, data, acl, flags);
+ return true;
+ }
+ });
+ } catch (KeeperException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ } catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }
+ }
+
+ /**
+ * Returns true if this protocol has been closed
+ *
+ * @return true if this protocol is closed
+ */
+ protected boolean isClosed() {
+ return closed.get();
+ }
+
+ /**
+ * Performs a retry delay if this is not the first attempt
+ *
+ * @param attemptCount the number of the attempts performed so far
+ */
+ protected void retryDelay(int attemptCount) {
+ if (attemptCount > 0) {
+ try {
+ Thread.sleep(attemptCount * retryDelay);
+ } catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),"Thread failed to sleep: " + e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}
diff --git a/jar/src/main/java/org/onap/music/lockingservice/ZNodeName.java b/jar/src/main/java/org/onap/music/lockingservice/ZNodeName.java
new file mode 100644
index 00000000..0c190f14
--- /dev/null
+++ b/jar/src/main/java/org/onap/music/lockingservice/ZNodeName.java
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2017 AT&T Intellectual Property
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+package org.onap.music.lockingservice;
+
+import org.onap.music.eelf.logging.EELFLoggerDelegate;
+import org.onap.music.eelf.logging.format.ErrorSeverity;
+import org.onap.music.eelf.logging.format.ErrorTypes;
+
+/**
+ * Represents an ephemeral znode name which has an ordered sequence number and can be sorted in
+ * order
+ *
+ */
+class ZNodeName implements Comparable<ZNodeName> {
+ private final String name;
+ private String prefix;
+ private int sequence = -1;
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(ZNodeName.class);
+
+ public ZNodeName(String name) {
+ if (name == null) {
+ throw new NullPointerException("id cannot be null");
+ }
+ this.name = name;
+ this.prefix = name;
+ int idx = name.lastIndexOf('-');
+ if (idx >= 0) {
+ this.prefix = name.substring(0, idx);
+ try {
+ this.sequence = Integer.parseInt(name.substring(idx + 1));
+ // If an exception occurred we misdetected a sequence suffix,
+ // so return -1.
+ } catch (NumberFormatException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),"Number format exception "+idx, ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),"Array out of bounds for "+idx, ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ZNodeName sequence = (ZNodeName) o;
+
+ if (!name.equals(sequence.name))
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode() + 37;
+ }
+
+ public int compareTo(ZNodeName that) {
+ int answer = this.prefix.compareTo(that.prefix);
+ if (answer == 0) {
+ int s1 = this.sequence;
+ int s2 = that.sequence;
+ if (s1 == -1 && s2 == -1) {
+ return this.name.compareTo(that.name);
+ }
+ answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2;
+ }
+ return answer;
+ }
+
+ /**
+ * Returns the name of the znode
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the sequence number
+ */
+ public int getZNodeName() {
+ return sequence;
+ }
+
+ /**
+ * Returns the text prefix before the sequence number
+ */
+ public String getPrefix() {
+ return prefix;
+ }
+}
diff --git a/jar/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java b/jar/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java
new file mode 100644
index 00000000..e99df255
--- /dev/null
+++ b/jar/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java
@@ -0,0 +1,339 @@
+/*
+ * ============LICENSE_START========================================== org.onap.music
+ * =================================================================== Copyright (c) 2017 AT&T
+ * Intellectual Property ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+package org.onap.music.lockingservice;
+
+
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.onap.music.eelf.logging.EELFLoggerDelegate;
+import org.onap.music.eelf.logging.format.AppMessages;
+import org.onap.music.eelf.logging.format.ErrorSeverity;
+import org.onap.music.eelf.logging.format.ErrorTypes;
+
+/**
+ * A <a href="package.html">protocol to implement an exclusive write lock or to elect a leader</a>.
+ * <p/>
+ * You invoke {@link #lock()} to start the process of grabbing the lock; you may get the lock then
+ * or it may be some time later.
+ * <p/>
+ * You can register a listener so that you are invoked when you get the lock; otherwise you can ask
+ * if you have the lock by calling {@link #isOwner()}
+ *
+ */
+public class ZkStatelessLockService extends ProtocolSupport {
+ public ZkStatelessLockService(ZooKeeper zk) {
+ zookeeper = zk;
+ }
+
+ private static EELFLoggerDelegate logger =
+ EELFLoggerDelegate.getLogger(ZkStatelessLockService.class);
+
+ protected void createLock(final String path, final byte[] data) {
+ final List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ try {
+ retryOperation(new ZooKeeperOperation() {
+ public boolean execute() throws KeeperException, InterruptedException {
+ zookeeper.create(path, data, acl, CreateMode.PERSISTENT);
+ return true;
+ }
+ });
+ }catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }catch (KeeperException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }
+ }
+
+ public void close() {
+ try {
+ zookeeper.close();
+ }catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }
+ }
+
+ public void setNodeData(final String lockName, final byte[] data) {
+ try {
+ retryOperation(new ZooKeeperOperation() {
+ public boolean execute() throws KeeperException, InterruptedException {
+ zookeeper.getSessionId();
+ zookeeper.setData("/" + lockName, data, -1);
+ return true;
+ }
+ });
+ }catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }catch (KeeperException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }
+
+ }
+
+ public byte[] getNodeData(final String lockName) {
+ try {
+ if (zookeeper.exists("/" + lockName, null) != null)
+ return zookeeper.getData("/" + lockName, false, null);
+ else
+ return null;
+
+ }catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }catch (KeeperException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }
+ return null;
+ }
+
+ public boolean checkIfLockExists(String lockName) {
+ boolean result = false;
+ try {
+ Stat stat = zookeeper.exists(lockName, false);
+ if (stat != null) {
+ result = true;
+ }
+ }catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }catch (KeeperException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }
+ return result;
+ }
+
+ public void createNode(String nodeName) {
+ ensurePathExists(nodeName);
+ }
+
+ public String createLockId(String dir) {
+ ensurePathExists(dir);
+ LockZooKeeperOperation zop = new LockZooKeeperOperation(dir);
+ try {
+ retryOperation(zop);
+ }catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }catch (KeeperException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ }
+ return zop.getId();
+ }
+
+ /**
+ * Attempts to acquire the exclusive write lock returning whether or not it was acquired. Note
+ * that the exclusive lock may be acquired some time later after this method has been invoked
+ * due to the current lock owner going away.
+ */
+ public synchronized boolean lock(String dir, String lockId)
+ throws KeeperException, InterruptedException {
+ if (isClosed()) {
+ return false;
+ }
+ LockZooKeeperOperation zop = new LockZooKeeperOperation(dir, lockId);
+ return (Boolean) retryOperation(zop);
+ }
+
+ /**
+ * Removes the lock or associated znode if you no longer require the lock. this also removes
+ * your request in the queue for locking in case you do not already hold the lock.
+ *
+ * @throws RuntimeException throws a runtime exception if it cannot connect to zookeeper.
+ * @throws NoNodeException
+ */
+ public synchronized void unlock(String lockId) throws RuntimeException, KeeperException.NoNodeException {
+ final String id = lockId;
+ if (!isClosed() && id != null) {
+ try {
+ ZooKeeperOperation zopdel = new ZooKeeperOperation() {
+ public boolean execute() throws KeeperException, InterruptedException {
+ zookeeper.delete(id, -1);
+ return Boolean.TRUE;
+ }
+ };
+ zopdel.execute();
+ } catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ // set that we have been interrupted.
+ Thread.currentThread().interrupt();
+ } catch (KeeperException.NoNodeException e) {
+ // do nothing
+ throw new KeeperException.NoNodeException("Lock doesn't exists. Release lock operation failed.");
+ } catch (KeeperException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e);
+ }
+ }
+ }
+
+ public synchronized String currentLockHolder(String mainLock) {
+ final String id = mainLock;
+ if (!isClosed() && id != null) {
+ List<String> names;
+ try {
+ names = zookeeper.getChildren(id, false);
+ if (names.isEmpty())
+ return "";
+ SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
+ for (String name : names) {
+ sortedNames.add(new ZNodeName(id + "/" + name));
+ }
+ return sortedNames.first().getName();
+ } catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ // set that we have been interrupted.
+ Thread.currentThread().interrupt();
+ } catch (KeeperException.NoNodeException e) {
+ // do nothing
+ } catch (KeeperException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e);
+ }
+ }
+ return "No lock holder!";
+ }
+
+ public synchronized void deleteLock(String mainLock) {
+ final String id = mainLock;
+ if (!isClosed() && id != null) {
+ try {
+ ZooKeeperOperation zopdel = new ZooKeeperOperation() {
+ public boolean execute() throws KeeperException, InterruptedException {
+ List<String> names = zookeeper.getChildren(id, false);
+ for (String name : names) {
+ zookeeper.delete(id + "/" + name, -1);
+ }
+ zookeeper.delete(id, -1);
+ return Boolean.TRUE;
+ }
+ };
+ zopdel.execute();
+ } catch (InterruptedException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ // set that we have been interrupted.
+ Thread.currentThread().interrupt();
+ } catch (KeeperException.NoNodeException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ // do nothing
+ } catch (KeeperException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.KEEPERERROR, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
+ throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e);
+ }
+ }
+
+ }
+
+ /**
+ * a zoookeeper operation that is mainly responsible for all the magic required for locking.
+ */
+ private class LockZooKeeperOperation implements ZooKeeperOperation {
+
+ /**
+ * find if we have been created earlier if not create our node
+ *
+ * @param prefix the prefix node
+ * @param zookeeper the zookeeper client
+ * @param dir the dir parent
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private String dir;
+ private String id = null;
+
+ public String getId() {
+ return id;
+ }
+
+ public LockZooKeeperOperation(String dir) {
+ this.dir = dir;
+ }
+
+ public LockZooKeeperOperation(String dir, String id) {
+ this.dir = dir;
+ this.id = id;
+ }
+
+ /**
+ * the command that is run and retried for actually obtaining the lock
+ *
+ * @return if the command was successful or not
+ */
+ public boolean execute() throws KeeperException, InterruptedException {
+ do {
+ if (id == null) {
+ String prefix = "x-";
+ byte[] data = {0x12, 0x34};
+ id = zookeeper.create(dir + "/" + prefix, data, getAcl(),
+ CreateMode.PERSISTENT_SEQUENTIAL);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(EELFLoggerDelegate.debugLogger, "Created id: " + id);
+ }
+ if (id != null)
+ break;
+ }
+ if (id != null) {
+ List<String> names = zookeeper.getChildren(dir, false);
+ if (names.isEmpty()) {
+ logger.info(EELFLoggerDelegate.applicationLogger, "No children in: " + dir
+ + " when we've just " + "created one! Lets recreate it...");
+ // lets force the recreation of the id
+ id = null;
+ return Boolean.FALSE;
+
+ } else {
+ // lets sort them explicitly (though they do seem to come back in order
+ // ususally :)
+ ZNodeName idName = new ZNodeName(id);
+ SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
+ for (String name : names) {
+ sortedNames.add(new ZNodeName(dir + "/" + name));
+ }
+ if (!sortedNames.contains(idName))
+ return Boolean.FALSE;
+
+ SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
+ if (!lessThanMe.isEmpty()) {
+ ZNodeName lastChildName = lessThanMe.last();
+ String lastChildId = lastChildName.getName();
+ if (logger.isDebugEnabled()) {
+ logger.debug(EELFLoggerDelegate.debugLogger, "watching less than me node: " + lastChildId);
+ }
+ Stat stat = zookeeper.exists(lastChildId, false);
+ if (stat != null) {
+ return Boolean.FALSE;
+ } else {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Could not find the" + " stats for less than me: "
+ + lastChildName.getName());
+ }
+ } else
+ return Boolean.TRUE;
+ }
+ }
+ } while (id == null);
+ return Boolean.FALSE;
+ }
+ }
+
+}
+
diff --git a/jar/src/main/java/org/onap/music/lockingservice/ZooKeeperOperation.java b/jar/src/main/java/org/onap/music/lockingservice/ZooKeeperOperation.java
new file mode 100644
index 00000000..7020d14d
--- /dev/null
+++ b/jar/src/main/java/org/onap/music/lockingservice/ZooKeeperOperation.java
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2017 AT&T Intellectual Property
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+package org.onap.music.lockingservice;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A callback object which can be used for implementing retry-able operations in the
+ * {@link org.onap.music.lockingservice.ProtocolSupport} class
+ *
+ */
+public interface ZooKeeperOperation {
+
+ /**
+ * Performs the operation - which may be involved multiple times if the connection
+ * to ZooKeeper closes during this operation
+ *
+ * @return the result of the operation or null
+ * @throws KeeperException FILL IN
+ * @throws InterruptedException FILL IN
+ */
+ public boolean execute() throws KeeperException, InterruptedException;
+}