diff options
Diffstat (limited to 'cps-ri')
6 files changed, 272 insertions, 35 deletions
diff --git a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java index 847a1d1297..daf4dd757b 100644 --- a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java +++ b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java @@ -220,6 +220,12 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService sessionManager.closeSession(sessionId); } + @Override + public void lockAnchor(final String sessionId, final String dataspaceName, + final String anchorName, final Long timeoutInMilliseconds) { + sessionManager.lockAnchor(sessionId, dataspaceName, anchorName, timeoutInMilliseconds); + } + private static Set<String> processAncestorXpath(final List<FragmentEntity> fragmentEntities, final CpsPathQuery cpsPathQuery) { final Set<String> ancestorXpath = new HashSet<>(); diff --git a/cps-ri/src/main/java/org/onap/cps/spi/utils/SessionManager.java b/cps-ri/src/main/java/org/onap/cps/spi/utils/SessionManager.java index eb535ecc37..e2786887ac 100644 --- a/cps-ri/src/main/java/org/onap/cps/spi/utils/SessionManager.java +++ b/cps-ri/src/main/java/org/onap/cps/spi/utils/SessionManager.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021-2022 Nordix Foundation + * Copyright (C) 2022 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,25 +20,43 @@ package org.onap.cps.spi.utils; -import java.util.HashMap; -import java.util.Map; +import com.google.common.util.concurrent.TimeLimiter; +import com.google.common.util.concurrent.UncheckedExecutionException; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.hibernate.HibernateException; +import org.hibernate.LockMode; import org.hibernate.Session; -import org.hibernate.SessionException; import org.hibernate.SessionFactory; import org.hibernate.cfg.Configuration; import org.onap.cps.spi.entities.AnchorEntity; import org.onap.cps.spi.entities.DataspaceEntity; import org.onap.cps.spi.entities.SchemaSetEntity; import org.onap.cps.spi.entities.YangResourceEntity; +import org.onap.cps.spi.exceptions.SessionManagerException; +import org.onap.cps.spi.exceptions.SessionTimeoutException; +import org.onap.cps.spi.repository.AnchorRepository; +import org.onap.cps.spi.repository.DataspaceRepository; import org.springframework.stereotype.Component; +@RequiredArgsConstructor +@Slf4j @Component public class SessionManager { + private final TimeLimiterProvider timeLimiterProvider; + private final DataspaceRepository dataspaceRepository; + private final AnchorRepository anchorRepository; private static SessionFactory sessionFactory; - private static Map<String, Session> sessionMap = new HashMap<>(); + private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>(); private synchronized void buildSessionFactory() { if (sessionFactory == null) { @@ -67,20 +85,81 @@ public class SessionManager { /** * Close session. + * Locks will be released and changes will be committed. * * @param sessionId session ID */ public void closeSession(final String sessionId) { try { - final Session currentSession = sessionMap.get(sessionId); - currentSession.getTransaction().commit(); - currentSession.close(); - } catch (final NullPointerException e) { - throw new SessionException(String.format("Session with session ID %s does not exist", sessionId)); + final Session session = getSession(sessionId); + session.getTransaction().commit(); + session.close(); } catch (final HibernateException e) { - throw new SessionException(String.format("Unable to close session with session ID %s", sessionId)); + throw new SessionManagerException("Cannot close session", + String.format("Unable to close session with session ID '%s'", sessionId), e); + } finally { + sessionMap.remove(sessionId); } - sessionMap.remove(sessionId); } -}
\ No newline at end of file + /** + * Lock Anchor. + * To release locks(s), the session holding the lock(s) must be closed. + * + * @param sessionId session ID + * @param dataspaceName dataspace name + * @param anchorName anchor name + * @param timeoutInMilliseconds lock attempt timeout in milliseconds + */ + @SneakyThrows + public void lockAnchor(final String sessionId, final String dataspaceName, + final String anchorName, final Long timeoutInMilliseconds) { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final TimeLimiter timeLimiter = timeLimiterProvider.getTimeLimiter(executorService); + + try { + timeLimiter.callWithTimeout(() -> { + applyPessimisticWriteLockOnAnchor(sessionId, dataspaceName, anchorName); + return null; + }, timeoutInMilliseconds, TimeUnit.MILLISECONDS); + } catch (final TimeoutException e) { + throw new SessionTimeoutException( + "Timeout: Anchor locking failed", + "The error could be caused by another session holding a lock on the specified table. " + + "Retrying the sending the request could be required.", e); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SessionManagerException("Operation interrupted", "This thread was interrupted.", e); + } catch (final ExecutionException | UncheckedExecutionException e) { + if (e.getCause() != null) { + throw e.getCause(); + } + throw new SessionManagerException( + "Operation Aborted", + "The transaction request was aborted. " + + "Retrying and checking all details are correct could be required", e); + } finally { + executorService.shutdownNow(); + } + } + + private void applyPessimisticWriteLockOnAnchor(final String sessionId, final String dataspaceName, + final String anchorName) { + final Session session = getSession(sessionId); + final DataspaceEntity dataspaceEntity = dataspaceRepository.getByName(dataspaceName); + final AnchorEntity anchorEntity = anchorRepository.getByDataspaceAndName(dataspaceEntity, anchorName); + final int anchorId = anchorEntity.getId(); + log.debug("Attempting to lock anchor {} for session {}", anchorName, sessionId); + session.get(AnchorEntity.class, anchorId, LockMode.PESSIMISTIC_WRITE); + log.info("Anchor {} successfully locked", anchorName); + } + + private Session getSession(final String sessionId) { + final Session session = sessionMap.get(sessionId); + if (session == null) { + throw new SessionManagerException("Session not found", + String.format("Session with ID %s does not exist", sessionId)); + } + return session; + } +} diff --git a/cps-ri/src/main/java/org/onap/cps/spi/utils/TimeLimiterProvider.java b/cps-ri/src/main/java/org/onap/cps/spi/utils/TimeLimiterProvider.java new file mode 100644 index 0000000000..2bd7ac3763 --- /dev/null +++ b/cps-ri/src/main/java/org/onap/cps/spi/utils/TimeLimiterProvider.java @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.spi.utils; + +import com.google.common.util.concurrent.SimpleTimeLimiter; +import com.google.common.util.concurrent.TimeLimiter; +import java.util.concurrent.ExecutorService; +import org.springframework.stereotype.Component; + +@Component +public class TimeLimiterProvider { + public TimeLimiter getTimeLimiter(final ExecutorService executorService) { + return SimpleTimeLimiter.create(executorService); + } +} diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy index 52f2309ccd..b37f471e76 100644 --- a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy +++ b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy @@ -129,4 +129,11 @@ class CpsDataPersistenceServiceSpec extends Specification { then: 'the session manager method to close session is invoked with parameter' 1 * mockSessionManager.closeSession(someSessionId) } + + def 'Lock anchor.'(){ + when: 'lock anchor method is called with anchor entity details' + objectUnderTest.lockAnchor('mySessionId', 'myDataspaceName', 'myAnchorName', 123L) + then: 'the session manager method to lock anchor is invoked with same parameters' + 1 * mockSessionManager.lockAnchor('mySessionId', 'myDataspaceName', 'myAnchorName', 123L) + } }
\ No newline at end of file diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy index c46092f075..9b58c8bc32 100644 --- a/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy +++ b/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021-2022 Nordix Foundation + * Copyright (C) 2022 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,37 +20,50 @@ package org.onap.cps.spi.utils -import org.hibernate.SessionException +import org.onap.cps.spi.exceptions.SessionManagerException import org.onap.cps.spi.impl.CpsPersistenceSpecBase +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.context.jdbc.Sql class SessionManagerIntegrationSpec extends CpsPersistenceSpecBase{ - def objectUnderTest = new SessionManager(); + final static String SET_DATA = '/data/anchor.sql' - def 'start session'() { - when: 'start session' - def result = objectUnderTest.startSession() - then: 'session ID is returned' - assert result instanceof String - objectUnderTest.closeSession(result) + @Autowired + SessionManager objectUnderTest + + def sessionId + def shortTimeoutForTesting = 200L + + def setup(){ + sessionId = objectUnderTest.startSession() } - def 'close session'(){ - given: 'session Id from calling the start session method' - def sessionId = objectUnderTest.startSession() - when: 'close session method is called' - objectUnderTest.closeSession(sessionId) + def cleanup(){ + objectUnderTest.closeSession(sessionId) + } + + @Sql([CLEAR_DATA, SET_DATA]) + def 'Lock anchor.'(){ + when: 'session tries to acquire anchor lock by passing anchor entity details' + objectUnderTest.lockAnchor(sessionId, DATASPACE_NAME, ANCHOR_NAME1, shortTimeoutForTesting) then: 'no exception is thrown' noExceptionThrown() } - def 'close session that does not exist' (){ - given: 'session Id that does not exist' - def unknownSessionId = 'unknown session id' - when: 'close session method is called' - objectUnderTest.closeSession(unknownSessionId) - then: 'a session exception is thrown' - def thrown = thrown(SessionException) - assert thrown.message.contains(unknownSessionId) + @Sql([CLEAR_DATA, SET_DATA]) + def 'Attempt to lock anchor when another session is holding the lock.'(){ + given: 'another session that holds an anchor lock' + def otherSessionId = objectUnderTest.startSession() + objectUnderTest.lockAnchor(otherSessionId,DATASPACE_NAME,ANCHOR_NAME1,shortTimeoutForTesting) + when: 'a session tries to acquire the same anchor lock' + objectUnderTest.lockAnchor(sessionId,DATASPACE_NAME,ANCHOR_NAME1,shortTimeoutForTesting) + then: 'a session manager exception is thrown specifying operation reached timeout' + def thrown = thrown(SessionManagerException) + thrown.message.contains('Timeout') + then: 'when the other session holding the lock is closed, lock can finally be acquired' + objectUnderTest.closeSession(otherSessionId) + objectUnderTest.lockAnchor(sessionId,DATASPACE_NAME,ANCHOR_NAME1,shortTimeoutForTesting) } + } diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerSpec.groovy new file mode 100644 index 0000000000..a2df06ef0e --- /dev/null +++ b/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerSpec.groovy @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.spi.utils + +import com.google.common.util.concurrent.TimeLimiter +import org.hibernate.HibernateException +import org.hibernate.Transaction +import org.onap.cps.spi.entities.AnchorEntity +import org.onap.cps.spi.exceptions.SessionManagerException +import org.onap.cps.spi.repository.AnchorRepository +import org.onap.cps.spi.repository.DataspaceRepository +import org.testcontainers.shaded.com.google.common.util.concurrent.UncheckedExecutionException +import spock.lang.Specification +import org.hibernate.Session + +import java.util.concurrent.ExecutionException + +class SessionManagerSpec extends Specification { + + def spiedTimeLimiterProvider = Spy(TimeLimiterProvider) + def mockDataspaceRepository = Mock(DataspaceRepository) + def mockAnchorRepository = Mock(AnchorRepository) + def mockSession = Mock(Session) + + def objectUnderTest = new SessionManager(spiedTimeLimiterProvider, mockDataspaceRepository, mockAnchorRepository) + + def 'Lock anchor entity with #exceptionDuringTest exception.'(){ + given: 'a dummy session' + objectUnderTest.sessionMap.put('dummySession', mockSession) + and: 'the anchor name can be resolved' + def mockAnchorEntity = Mock(AnchorEntity) + mockAnchorEntity.getId() > 456 + mockAnchorRepository.getByDataspaceAndName(_, _) >> mockAnchorEntity + and: 'timeLimiter throws an #exceptionDuringTest exception' + def mockTimeLimiter = Mock(TimeLimiter) + spiedTimeLimiterProvider.getTimeLimiter(_) >> mockTimeLimiter + mockTimeLimiter.callWithTimeout(*_) >> { throw exceptionDuringTest } + when: 'session tries to acquire anchor lock' + objectUnderTest.lockAnchor('dummySession', 'some-dataspace','some-anchor', 123L) + then: 'a session manager exception is thrown with the expected detail' + def thrown = thrown(SessionManagerException) + thrown.details.contains(expectedExceptionDetail) + where: + exceptionDuringTest || expectedExceptionDetail + new InterruptedException() || 'interrupted' + new ExecutionException() || 'aborted' + } + + def 'Close session that does not exist.'() { + when: 'attempt to close session that does not exist' + objectUnderTest.closeSession('unknown session id') + then: 'a session manager exception is thrown with the unknown id in the details' + def thrown = thrown(SessionManagerException) + assert thrown.details.contains('unknown session id') + } + + def 'Hibernate exception while closing session.'() { + given: 'a test session with a transaction' + objectUnderTest.sessionMap.put('testSessionId', mockSession) + mockSession.getTransaction() >> Mock(Transaction) + and: 'an hibernate exception when closing that session' + def hibernateException = new HibernateException('test') + mockSession.close() >> { throw hibernateException } + when: 'attempt to close session' + objectUnderTest.closeSession('testSessionId') + then: 'a session manager exception is thrown with the session id in the details' + def thrown = thrown(SessionManagerException) + assert thrown.details.contains('testSessionId') + and: 'the original exception as cause' + assert thrown.cause == hibernateException + } + + def 'Attempt to lock anchor entity with session Id that does not exists'(){ + when: 'attempt to acquire anchor lock with session that does not exists' + objectUnderTest.lockAnchor('unknown session id','','',123L) + then: 'a session manager exception is thrown with the unknown id in the details' + def thrown = thrown(SessionManagerException) + thrown.details.contains('unknown session id') + } + +} |