diff options
Diffstat (limited to 'plugins/plugins-context/context-locking/context-locking-curator/src')
5 files changed, 559 insertions, 0 deletions
diff --git a/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorLockFacade.java b/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorLockFacade.java new file mode 100644 index 000000000..928255031 --- /dev/null +++ b/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorLockFacade.java @@ -0,0 +1,137 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.context.locking.curator; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This class provides a facade over the {@link Lock} interface for Curator locks. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class CuratorLockFacade implements Lock { + // Logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(CuratorLockFacade.class); + + // The Lock ID + private final String lockId; + + // The mutex used for Curator locking + private final InterProcessMutex lockMutex; + + /** + * Create the lock Facade. + * + * @param lockMutex The lock mutex behind the facade + * @param lockId The ID of the lock + */ + public CuratorLockFacade(final InterProcessMutex lockMutex, final String lockId) { + this.lockId = lockId; + this.lockMutex = lockMutex; + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.locks.Lock#lock() + */ + @Override + public void lock() { + try { + lockMutex.acquire(); + } catch (final Exception e) { + LOGGER.warn("failed to acquire lock for \"" + lockId, e); + } + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.locks.Lock#lockInterruptibly() + */ + @Override + public void lockInterruptibly() throws InterruptedException { + LOGGER.warn("lockInterruptibly() not supported for \"" + lockId); + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.locks.Lock#tryLock() + */ + @Override + public boolean tryLock() { + try { + lockMutex.acquire(); + return true; + } catch (final Exception e) { + LOGGER.warn("failed to acquire lock for \"" + lockId, e); + return false; + } + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.locks.Lock#tryLock(long, java.util.concurrent.TimeUnit) + */ + @Override + public boolean tryLock(final long time, final TimeUnit unit) throws InterruptedException { + try { + lockMutex.acquire(time, unit); + return true; + } catch (final Exception e) { + LOGGER.warn("failed to acquire lock for \"" + lockId, e); + return false; + } + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.locks.Lock#unlock() + */ + @Override + public void unlock() { + try { + lockMutex.release(); + } catch (final Exception e) { + LOGGER.warn("failed to release lock for \"" + lockId, e); + } + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.locks.Lock#newCondition() + */ + @Override + public Condition newCondition() { + LOGGER.warn("newCondition() not supported for \"" + lockId); + return null; + } +} diff --git a/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorLockManager.java b/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorLockManager.java new file mode 100644 index 000000000..d0cc842fd --- /dev/null +++ b/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorLockManager.java @@ -0,0 +1,185 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.context.locking.curator; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.CreateMode; +import org.onap.policy.apex.context.ContextException; +import org.onap.policy.apex.context.impl.locking.AbstractLockManager; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.onap.policy.apex.model.basicmodel.service.ParameterService; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class CuratorLockManager manages the Curator interface towards Zookeeper for administering + * the Apex Context Album instance locks.. + */ +public class CuratorLockManager extends AbstractLockManager { + // Logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(CuratorLockManager.class); + + // The Curator framework used for locking + private CuratorFramework curatorFramework; + + // The address of the Zookeeper server + private String curatorZookeeperAddress; + + /** + * Constructor, set up a lock manager that uses Curator locking. + * + * @throws ContextException On errors connecting to Curator + */ + public CuratorLockManager() throws ContextException { + LOGGER.entry("CuratorLockManager(): setting up the Curator lock manager . . ."); + + LOGGER.exit("CuratorLockManager(): Curator lock manager set up"); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.context.impl.locking.AbstractLockManager#init(org.onap.policy.apex. + * model. basicmodel.concepts.AxArtifactKey) + */ + @Override + public void init(final AxArtifactKey key) throws ContextException { + LOGGER.entry("init(" + key + ")"); + + super.init(key); + + // Get the lock manager parameters + final CuratorLockManagerParameters lockParameters = + ParameterService.getParameters(CuratorLockManagerParameters.class); + + // Check if the curator address has been set + curatorZookeeperAddress = lockParameters.getZookeeperAddress(); + if (curatorZookeeperAddress == null || curatorZookeeperAddress.trim().length() == 0) { + LOGGER.warn( + "could not set up Curator locking, check if the curator Zookeeper address parameter is set correctly"); + throw new ContextException( + "could not set up Curator locking, check if the curator Zookeeper address parameter is set correctly"); + } + + // Set up the curator framework we'll use + curatorFramework = CuratorFrameworkFactory.builder().connectString(curatorZookeeperAddress) + .retryPolicy(new ExponentialBackoffRetry(lockParameters.getZookeeperConnectSleepTime(), + lockParameters.getZookeeperContextRetries())) + .build(); + + // Listen for changes on the Curator connection + curatorFramework.getConnectionStateListenable().addListener(new CuratorManagerConnectionStateListener()); + + // Start the framework and specify Ephemeral nodes + curatorFramework.start(); + + // Wait for the connection to be made + try { + curatorFramework.blockUntilConnected( + lockParameters.getZookeeperConnectSleepTime() * lockParameters.getZookeeperContextRetries(), + TimeUnit.MILLISECONDS); + } catch (final InterruptedException e) { + LOGGER.warn("could not connect to Zookeeper server at \"" + curatorZookeeperAddress + + "\", wait for connection timed out"); + throw new ContextException("could not connect to Zookeeper server at \"" + curatorZookeeperAddress + + "\", wait for connection timed out"); + } + + if (!curatorFramework.getZookeeperClient().isConnected()) { + LOGGER.warn("could not connect to Zookeeper server at \"" + curatorZookeeperAddress + + "\", see error log for details"); + throw new ContextException("could not connect to Zookeeper server at \"" + curatorZookeeperAddress + + "\", see error log for details"); + } + + // We'll use Ephemeral nodes for locks on the Zookeeper server + curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL); + + LOGGER.exit("init(" + key + "," + lockParameters + ")"); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.core.context.impl.locking.AbstractLockManager#getReentrantReadWriteLock( + * java.lang.String) + */ + @Override + public ReadWriteLock getReentrantReadWriteLock(final String lockId) throws ContextException { + // Check if the framework is active + if (curatorFramework != null && curatorFramework.getZookeeperClient().isConnected()) { + return new CuratorReentrantReadWriteLock(curatorFramework, "/" + lockId); + } else { + throw new ContextException("creation of lock using Zookeeper server at \"" + curatorZookeeperAddress + + "\", failed, see error log for details"); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.context.LockManager#shutdown() + */ + @Override + public void shutdown() { + if (curatorFramework == null) { + return; + } + CloseableUtils.closeQuietly(curatorFramework); + curatorFramework = null; + } + + /** + * This class is a callback class for state changes on the curator to Zookeeper connection. + */ + private class CuratorManagerConnectionStateListener implements ConnectionStateListener { + + /* + * (non-Javadoc) + * + * @see org.apache.curator.framework.state.ConnectionStateListener#stateChanged(org.apache. + * curator.framework.CuratorFramework, org.apache.curator.framework.state.ConnectionState) + */ + @Override + public void stateChanged(final CuratorFramework incomngCuratorFramework, final ConnectionState newState) { + // Is the state changed for this curator framework? + if (!incomngCuratorFramework.equals(curatorFramework)) { + return; + } + + LOGGER.info("curator state of client \"" + curatorFramework + "\" connected to \"" + curatorZookeeperAddress + + "\" changed to " + newState); + + if (newState != ConnectionState.CONNECTED) { + shutdown(); + } + } + } +} diff --git a/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorLockManagerParameters.java b/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorLockManagerParameters.java new file mode 100644 index 000000000..9e8d5d2af --- /dev/null +++ b/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorLockManagerParameters.java @@ -0,0 +1,120 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.context.locking.curator; + +import org.onap.policy.apex.context.parameters.LockManagerParameters; +import org.onap.policy.apex.model.basicmodel.service.ParameterService; + +/** + * Bean class for Curator locking parameters. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class CuratorLockManagerParameters extends LockManagerParameters { + // @formatter:off + /** The default address used to connect to the Zookeeper server. */ + public static final String DEFAULT_ZOOKEEPER_ADDRESS = "localhost:2181"; + + /** The default sleep time to use when connecting to the Zookeeper server. */ + public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEP_TIME = 1000; + + /** The default number of times to retry failed connections to the Zookeeper server. */ + public static final int DEFAULT_ZOOKEEPER_CONNECT_RETRIES = 3; + + // Curator parameters + private String zookeeperAddress = DEFAULT_ZOOKEEPER_ADDRESS; + private int zookeeperConnectSleepTime = DEFAULT_ZOOKEEPER_CONNECT_SLEEP_TIME; + private int zookeeperContextRetries = DEFAULT_ZOOKEEPER_CONNECT_RETRIES; + // @formatter:on + + /** + * The Constructor. + */ + public CuratorLockManagerParameters() { + super(CuratorLockManagerParameters.class.getCanonicalName()); + ParameterService.registerParameters(CuratorLockManagerParameters.class, this); + } + + /** + * Gets the zookeeper address. + * + * @return the zookeeper address + */ + public String getZookeeperAddress() { + return zookeeperAddress; + } + + /** + * Sets the zookeeper address. + * + * @param zookeeperAddress the zookeeper address + */ + public void setZookeeperAddress(final String zookeeperAddress) { + this.zookeeperAddress = zookeeperAddress; + } + + /** + * Gets the zookeeper connect sleep time. + * + * @return the zookeeper connect sleep time + */ + public int getZookeeperConnectSleepTime() { + return zookeeperConnectSleepTime; + } + + /** + * Sets the zookeeper connect sleep time. + * + * @param zookeeperConnectSleepTime the zookeeper connect sleep time + */ + public void setZookeeperConnectSleepTime(final int zookeeperConnectSleepTime) { + this.zookeeperConnectSleepTime = zookeeperConnectSleepTime; + } + + /** + * Gets the zookeeper context retries. + * + * @return the zookeeper context retries + */ + public int getZookeeperContextRetries() { + return zookeeperContextRetries; + } + + /** + * Sets the zookeeper context retries. + * + * @param zookeeperContextRetries the zookeeper context retries + */ + public void setZookeeperContextRetries(final int zookeeperContextRetries) { + this.zookeeperContextRetries = zookeeperContextRetries; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.context.parameters.LockManagerParameters#toString() + */ + @Override + public String toString() { + return "CuratorLockManagerParameters [zookeeperAddress=" + zookeeperAddress + ", zookeeperConnectSleepTime=" + + zookeeperConnectSleepTime + ", zookeeperContextRetries=" + zookeeperContextRetries + "]"; + } +} diff --git a/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorReentrantReadWriteLock.java b/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorReentrantReadWriteLock.java new file mode 100644 index 000000000..22bf5e596 --- /dev/null +++ b/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/CuratorReentrantReadWriteLock.java @@ -0,0 +1,90 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.context.locking.curator; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; + +/** + * This class maps a Curator {@link InterProcessReadWriteLock} to a Java {@link ReadWriteLock}. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class CuratorReentrantReadWriteLock implements ReadWriteLock { + // The Lock ID + private final String lockID; + + // The Curator lock + private final InterProcessReadWriteLock curatorReadWriteLock; + + // The Curator Lock facades for read and write locks + private final CuratorLockFacade readLockFacade; + private final CuratorLockFacade writeLockFacade; + + /** + * Create a Curator lock. + * + * @param curatorFramework the Curator framework to use to create the lock + * @param lockId The unique ID of the lock. + */ + public CuratorReentrantReadWriteLock(final CuratorFramework curatorFramework, final String lockId) { + lockID = lockId; + + // Create the Curator lock + curatorReadWriteLock = new InterProcessReadWriteLock(curatorFramework, lockId); + + // Create the lock facades + readLockFacade = new CuratorLockFacade(curatorReadWriteLock.readLock(), lockId); + writeLockFacade = new CuratorLockFacade(curatorReadWriteLock.writeLock(), lockId); + } + + /** + * Get the lock Id of the lock. + * + * @return the lock ID + */ + public String getLockID() { + return lockID; + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.locks.ReadWriteLock#readLock() + */ + @Override + public Lock readLock() { + return readLockFacade; + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.locks.ReadWriteLock#writeLock() + */ + @Override + public Lock writeLock() { + return writeLockFacade; + } +} diff --git a/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/package-info.java b/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/package-info.java new file mode 100644 index 000000000..d867c396a --- /dev/null +++ b/plugins/plugins-context/context-locking/context-locking-curator/src/main/java/org/onap/policy/apex/plugins/context/locking/curator/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Implements locking on context items in APEX context albums using + * <a href="http://curator.apache.org/">Curator</a> centralized locking. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.plugins.context.locking.curator; |