diff options
6 files changed, 37 insertions, 33 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index 9b6f41ec23..56515df776 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -45,13 +45,12 @@ spring: username: ${DB_USERNAME}
password: ${DB_PASSWORD}
driverClassName: org.postgresql.Driver
- initialization-mode: always
hikari:
minimumIdle: 5
maximumPoolSize: 80
- idleTimeout: 120000
- connectionTimeout: 300000
- leakDetectionThreshold: 300000
+ idleTimeout: 60000
+ connectionTimeout: 120000
+ leakDetectionThreshold: 2000
pool-name: CpsDatabasePool
cache:
@@ -91,6 +90,9 @@ spring: default-property-inclusion: NON_NULL
serialization:
FAIL_ON_EMPTY_BEANS: false
+ sql:
+ init:
+ mode: ALWAYS
app:
ncmp:
async-m2m:
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java index d457f2601b..d5b459b025 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java @@ -56,7 +56,7 @@ public class DmiRestClient { } catch (final HttpStatusCodeException httpStatusCodeException) { final String exceptionMessage = "Unable to " + operation.toString() + " resource data."; throw new HttpClientRequestException(exceptionMessage, httpStatusCodeException.getResponseBodyAsString(), - httpStatusCodeException.getRawStatusCode()); + httpStatusCodeException.getRawStatusCode()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java index ada3dc6744..f914547a50 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java @@ -71,6 +71,7 @@ public class ModuleSyncTasks { moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY); } catch (final Exception e) { + log.warn("Processing module sync batch failed."); syncUtils.updateLockReasonDetailsAndAttempts(compositeState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); @@ -81,6 +82,7 @@ public class ModuleSyncTasks { lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandelStatePerCmHandle); } finally { batchCounter.getAndDecrement(); + log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get()); } return COMPLETED_FUTURE; } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index 73954c36b3..64d111f993 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -62,16 +62,22 @@ public class ModuleSyncWatchdog { */ @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}") public void moduleSyncAdvisedCmHandles() { + log.info("Processing module sync watchdog waking up."); populateWorkQueueIfNeeded(); final int asyncTaskParallelismLevel = asyncTaskExecutor.getAsyncTaskParallelismLevel(); - while (!moduleSyncWorkQueue.isEmpty() && batchCounter.get() <= asyncTaskParallelismLevel) { - batchCounter.getAndIncrement(); - final Collection<DataNode> nextBatch = prepareNextBatch(); - asyncTaskExecutor.executeTask(() -> - moduleSyncTasks.performModuleSync(nextBatch, batchCounter), - ASYNC_TASK_TIMEOUT_IN_MILLISECONDS - ); - preventBusyWait(); + while (!moduleSyncWorkQueue.isEmpty()) { + if (batchCounter.get() <= asyncTaskParallelismLevel) { + final Collection<DataNode> nextBatch = prepareNextBatch(); + log.debug("Processing module sync batch of {}. {} batch(es) active.", + nextBatch.size(), batchCounter.get()); + asyncTaskExecutor.executeTask(() -> + moduleSyncTasks.performModuleSync(nextBatch, batchCounter), + ASYNC_TASK_TIMEOUT_IN_MILLISECONDS + ); + batchCounter.getAndIncrement(); + } else { + preventBusyWait(); + } } } @@ -80,6 +86,7 @@ public class ModuleSyncWatchdog { */ @Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}") public void resetPreviouslyFailedCmHandles() { + log.info("Processing module sync retry-watchdog waking up."); final List<YangModelCmHandle> failedCmHandles = syncUtils.getModuleSyncFailedCmHandles(); moduleSyncTasks.resetFailedCmHandles(failedCmHandles); } @@ -95,6 +102,7 @@ public class ModuleSyncWatchdog { private void populateWorkQueueIfNeeded() { if (moduleSyncWorkQueue.isEmpty()) { final List<DataNode> advisedCmHandles = syncUtils.getAdvisedCmHandles(); + log.info("Processing module sync fetched {} advised cm handles from DB", advisedCmHandles.size()); for (final DataNode advisedCmHandle : advisedCmHandles) { if (!moduleSyncWorkQueue.offer(advisedCmHandle)) { log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id")); diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy index e5240c0e66..dd989bf676 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -52,20 +52,19 @@ class ModuleSyncWatchdogSpec extends Specification { def 'Module sync advised cm handles with #scenario.'() { given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles' mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles) - and: 'the executor has #parallelismLevel available threads' - spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> parallelismLevel + and: 'the executor has enough available threads' + spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3 when: ' module sync is started' objectUnderTest.moduleSyncAdvisedCmHandles() then: 'it performs #expectedNumberOfTaskExecutions tasks' expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_) where: ' the following parameter are used' - scenario | parallelismLevel | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions - 'less then 1 batch' | 9 | 1 || 1 - 'exactly 1 batch' | 9 | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1 - '2 batches' | 9 | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2 - 'queue capacity' | 9 | testQueueCapacity || 3 - 'over queue capacity' | 9 | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3 - 'not enough threads' | 2 | testQueueCapacity || 2 + scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions + 'less then 1 batch' | 1 || 1 + 'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1 + '2 batches' | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2 + 'queue capacity' | testQueueCapacity || 3 + 'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3 } def 'Reset failed cm handles.'() { diff --git a/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java index f07f7f8d5b..47a3e8f319 100644 --- a/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java +++ b/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java @@ -20,7 +20,6 @@ package org.onap.cps.spi.repository; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,12 +28,14 @@ import javax.persistence.PersistenceContext; import javax.persistence.Query; import javax.transaction.Transactional; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.onap.cps.cpspath.parser.CpsPathPrefixType; import org.onap.cps.cpspath.parser.CpsPathQuery; import org.onap.cps.spi.entities.FragmentEntity; import org.onap.cps.utils.JsonObjectMapper; @RequiredArgsConstructor +@Slf4j public class FragmentRepositoryCpsPathQueryImpl implements FragmentRepositoryCpsPathQuery { public static final String SIMILAR_TO_ABSOLUTE_PATH_PREFIX = "%/"; @@ -62,16 +63,8 @@ public class FragmentRepositoryCpsPathQueryImpl implements FragmentRepositoryCps addTextFunctionCondition(cpsPathQuery, sqlStringBuilder, queryParameters); final Query query = entityManager.createNativeQuery(sqlStringBuilder.toString(), FragmentEntity.class); setQueryParameters(query, queryParameters); - return getFragmentEntitiesAsStream(query); - } - - private List<FragmentEntity> getFragmentEntitiesAsStream(final Query query) { - final List<FragmentEntity> fragmentEntities = new ArrayList<>(); - query.getResultStream().forEach(fragmentEntity -> { - fragmentEntities.add((FragmentEntity) fragmentEntity); - entityManager.detach(fragmentEntity); - }); - + final List<FragmentEntity> fragmentEntities = query.getResultList(); + log.debug("Fetched {} fragment entities by anchor and cps path.", fragmentEntities.size()); return fragmentEntities; } |