diff options
Diffstat (limited to 'cps-ri/src/main/java/org')
3 files changed, 131 insertions, 13 deletions
diff --git a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java index 847a1d1297..daf4dd757b 100644 --- a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java +++ b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java @@ -220,6 +220,12 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService sessionManager.closeSession(sessionId); } + @Override + public void lockAnchor(final String sessionId, final String dataspaceName, + final String anchorName, final Long timeoutInMilliseconds) { + sessionManager.lockAnchor(sessionId, dataspaceName, anchorName, timeoutInMilliseconds); + } + private static Set<String> processAncestorXpath(final List<FragmentEntity> fragmentEntities, final CpsPathQuery cpsPathQuery) { final Set<String> ancestorXpath = new HashSet<>(); diff --git a/cps-ri/src/main/java/org/onap/cps/spi/utils/SessionManager.java b/cps-ri/src/main/java/org/onap/cps/spi/utils/SessionManager.java index eb535ecc37..e2786887ac 100644 --- a/cps-ri/src/main/java/org/onap/cps/spi/utils/SessionManager.java +++ b/cps-ri/src/main/java/org/onap/cps/spi/utils/SessionManager.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021-2022 Nordix Foundation + * Copyright (C) 2022 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,25 +20,43 @@ package org.onap.cps.spi.utils; -import java.util.HashMap; -import java.util.Map; +import com.google.common.util.concurrent.TimeLimiter; +import com.google.common.util.concurrent.UncheckedExecutionException; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.hibernate.HibernateException; +import org.hibernate.LockMode; import org.hibernate.Session; -import org.hibernate.SessionException; import org.hibernate.SessionFactory; import org.hibernate.cfg.Configuration; import org.onap.cps.spi.entities.AnchorEntity; import org.onap.cps.spi.entities.DataspaceEntity; import org.onap.cps.spi.entities.SchemaSetEntity; import org.onap.cps.spi.entities.YangResourceEntity; +import org.onap.cps.spi.exceptions.SessionManagerException; +import org.onap.cps.spi.exceptions.SessionTimeoutException; +import org.onap.cps.spi.repository.AnchorRepository; +import org.onap.cps.spi.repository.DataspaceRepository; import org.springframework.stereotype.Component; +@RequiredArgsConstructor +@Slf4j @Component public class SessionManager { + private final TimeLimiterProvider timeLimiterProvider; + private final DataspaceRepository dataspaceRepository; + private final AnchorRepository anchorRepository; private static SessionFactory sessionFactory; - private static Map<String, Session> sessionMap = new HashMap<>(); + private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>(); private synchronized void buildSessionFactory() { if (sessionFactory == null) { @@ -67,20 +85,81 @@ public class SessionManager { /** * Close session. + * Locks will be released and changes will be committed. * * @param sessionId session ID */ public void closeSession(final String sessionId) { try { - final Session currentSession = sessionMap.get(sessionId); - currentSession.getTransaction().commit(); - currentSession.close(); - } catch (final NullPointerException e) { - throw new SessionException(String.format("Session with session ID %s does not exist", sessionId)); + final Session session = getSession(sessionId); + session.getTransaction().commit(); + session.close(); } catch (final HibernateException e) { - throw new SessionException(String.format("Unable to close session with session ID %s", sessionId)); + throw new SessionManagerException("Cannot close session", + String.format("Unable to close session with session ID '%s'", sessionId), e); + } finally { + sessionMap.remove(sessionId); } - sessionMap.remove(sessionId); } -}
\ No newline at end of file + /** + * Lock Anchor. + * To release locks(s), the session holding the lock(s) must be closed. + * + * @param sessionId session ID + * @param dataspaceName dataspace name + * @param anchorName anchor name + * @param timeoutInMilliseconds lock attempt timeout in milliseconds + */ + @SneakyThrows + public void lockAnchor(final String sessionId, final String dataspaceName, + final String anchorName, final Long timeoutInMilliseconds) { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final TimeLimiter timeLimiter = timeLimiterProvider.getTimeLimiter(executorService); + + try { + timeLimiter.callWithTimeout(() -> { + applyPessimisticWriteLockOnAnchor(sessionId, dataspaceName, anchorName); + return null; + }, timeoutInMilliseconds, TimeUnit.MILLISECONDS); + } catch (final TimeoutException e) { + throw new SessionTimeoutException( + "Timeout: Anchor locking failed", + "The error could be caused by another session holding a lock on the specified table. " + + "Retrying the sending the request could be required.", e); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SessionManagerException("Operation interrupted", "This thread was interrupted.", e); + } catch (final ExecutionException | UncheckedExecutionException e) { + if (e.getCause() != null) { + throw e.getCause(); + } + throw new SessionManagerException( + "Operation Aborted", + "The transaction request was aborted. " + + "Retrying and checking all details are correct could be required", e); + } finally { + executorService.shutdownNow(); + } + } + + private void applyPessimisticWriteLockOnAnchor(final String sessionId, final String dataspaceName, + final String anchorName) { + final Session session = getSession(sessionId); + final DataspaceEntity dataspaceEntity = dataspaceRepository.getByName(dataspaceName); + final AnchorEntity anchorEntity = anchorRepository.getByDataspaceAndName(dataspaceEntity, anchorName); + final int anchorId = anchorEntity.getId(); + log.debug("Attempting to lock anchor {} for session {}", anchorName, sessionId); + session.get(AnchorEntity.class, anchorId, LockMode.PESSIMISTIC_WRITE); + log.info("Anchor {} successfully locked", anchorName); + } + + private Session getSession(final String sessionId) { + final Session session = sessionMap.get(sessionId); + if (session == null) { + throw new SessionManagerException("Session not found", + String.format("Session with ID %s does not exist", sessionId)); + } + return session; + } +} diff --git a/cps-ri/src/main/java/org/onap/cps/spi/utils/TimeLimiterProvider.java b/cps-ri/src/main/java/org/onap/cps/spi/utils/TimeLimiterProvider.java new file mode 100644 index 0000000000..2bd7ac3763 --- /dev/null +++ b/cps-ri/src/main/java/org/onap/cps/spi/utils/TimeLimiterProvider.java @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.spi.utils; + +import com.google.common.util.concurrent.SimpleTimeLimiter; +import com.google.common.util.concurrent.TimeLimiter; +import java.util.concurrent.ExecutorService; +import org.springframework.stereotype.Component; + +@Component +public class TimeLimiterProvider { + public TimeLimiter getTimeLimiter(final ExecutorService executorService) { + return SimpleTimeLimiter.create(executorService); + } +} |