aboutsummaryrefslogtreecommitdiffstats
path: root/catalog-model/src/main/java/org/openecomp/sdc/be/model/operations/impl/CacheMangerOperation.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-model/src/main/java/org/openecomp/sdc/be/model/operations/impl/CacheMangerOperation.java')
-rw-r--r--catalog-model/src/main/java/org/openecomp/sdc/be/model/operations/impl/CacheMangerOperation.java309
1 files changed, 148 insertions, 161 deletions
diff --git a/catalog-model/src/main/java/org/openecomp/sdc/be/model/operations/impl/CacheMangerOperation.java b/catalog-model/src/main/java/org/openecomp/sdc/be/model/operations/impl/CacheMangerOperation.java
index d677a7e257..758e46544d 100644
--- a/catalog-model/src/main/java/org/openecomp/sdc/be/model/operations/impl/CacheMangerOperation.java
+++ b/catalog-model/src/main/java/org/openecomp/sdc/be/model/operations/impl/CacheMangerOperation.java
@@ -20,185 +20,172 @@
package org.openecomp.sdc.be.model.operations.impl;
-import java.util.LinkedList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.openecomp.sdc.be.config.Configuration;
import org.openecomp.sdc.be.config.ConfigurationManager;
import org.openecomp.sdc.be.dao.titan.TitanGenericDao;
import org.openecomp.sdc.be.datatypes.enums.NodeTypeEnum;
import org.openecomp.sdc.be.model.cache.ComponentCache;
import org.openecomp.sdc.be.model.cache.DaoInfo;
-import org.openecomp.sdc.be.model.cache.jobs.CheckAndUpdateJob;
-import org.openecomp.sdc.be.model.cache.jobs.DeleteJob;
-import org.openecomp.sdc.be.model.cache.jobs.Job;
-import org.openecomp.sdc.be.model.cache.jobs.OverrideJob;
-import org.openecomp.sdc.be.model.cache.jobs.StoreJob;
+import org.openecomp.sdc.be.model.cache.jobs.*;
import org.openecomp.sdc.be.model.cache.workers.CacheWorker;
import org.openecomp.sdc.be.model.cache.workers.IWorker;
import org.openecomp.sdc.be.model.cache.workers.SyncWorker;
import org.openecomp.sdc.be.model.jsontitan.operations.ToscaOperationFacade;
import org.openecomp.sdc.be.model.operations.api.ICacheMangerOperation;
-import org.openecomp.sdc.be.workers.Manager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.openecomp.sdc.common.log.wrappers.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.LinkedList;
+import java.util.concurrent.*;
/**
* Created by mlando on 9/5/2016. the class is responsible for handling all cache update operations asynchronously including sync between the graph and cache and on demand update requests
*/
@Component("cacheManger-operation")
public class CacheMangerOperation implements ICacheMangerOperation {
- @Autowired
- private ToscaOperationFacade toscaOperationFacade;
- @Autowired
- private TitanGenericDao titanGenericDao;
- @Autowired
- private ComponentCache componentCache;
-
- private static Logger log = LoggerFactory.getLogger(Manager.class.getName());
- private LinkedBlockingQueue<Job> jobQueue = null;
- private int waitOnShutDownInMinutes;
- private ScheduledExecutorService syncExecutor;
- private ExecutorService workerExecutor;
- private LinkedList<IWorker> workerList = new LinkedList<>();
- private DaoInfo daoInfo;
-
- /**
- * constructor
- */
- public CacheMangerOperation() {
- }
-
- /**
- * the method checks in the cache is enabled, if it is, it initializes all the workers according to the configuration values.
- */
- @PostConstruct
- public void init() {
-
- daoInfo = new DaoInfo(toscaOperationFacade, componentCache);
-
- Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
- if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
- Integer numberOfWorkers = applicationL2CacheConfig.getQueue().getNumberOfCacheWorkers();
- this.waitOnShutDownInMinutes = applicationL2CacheConfig.getQueue().getWaitOnShutDownInMinutes();
- jobQueue = new LinkedBlockingQueue<>();
- log.info("L2 Cache is enabled inishilsing queue");
- log.debug("initializing SyncWorker, creating {} workers");
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Sync-Cache-Worker-%d").build();
- this.syncExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
- log.debug("initializing workers, creating {} cacheWorkers", numberOfWorkers);
- threadFactory = new ThreadFactoryBuilder().setNameFormat("Cache-Worker-%d").build();
- String workerName = "Sync-Worker";
- Integer syncWorkerExacutionIntrval = applicationL2CacheConfig.getQueue().getSyncIntervalInSecondes();
- log.debug("starting Sync worker:{} with executions interval:{} ", workerName, syncWorkerExacutionIntrval);
- SyncWorker syncWorker = new SyncWorker(workerName, this);
- this.syncExecutor.scheduleAtFixedRate(syncWorker, 5 * 60, syncWorkerExacutionIntrval, TimeUnit.SECONDS);
- this.workerExecutor = Executors.newFixedThreadPool(numberOfWorkers, threadFactory);
- CacheWorker cacheWorker;
- for (int i = 0; i < numberOfWorkers; i++) {
- workerName = "Cache-Worker-" + i;
- log.debug("starting Cache worker:{}", workerName);
- cacheWorker = new CacheWorker(workerName, jobQueue);
- this.workerExecutor.submit(cacheWorker);
- this.workerList.add(cacheWorker);
- }
- } else {
- log.info("L2 Cache is disabled");
- }
- log.info("L2 Cache has been initialized and the workers are running");
- }
-
- /**
- * the method creates a job to check it the given component is in the cach and if so is it valid if the value in the cache is not valid it will be updated.
- *
- * @param componentId
- * the uid of the component we want to update
- * @param timestamp
- * the time of the component update
- * @param nodeTypeEnum
- * the type of the component resource/service/product
- */
- @Override
- public void updateComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
- Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
- if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
- this.jobQueue.add(new CheckAndUpdateJob(daoInfo, componentId, nodeTypeEnum, timestamp));
- }
- }
-
- public void overideComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
- Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
- if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
- this.jobQueue.add(new OverrideJob(daoInfo, componentId, nodeTypeEnum, timestamp));
- }
- }
-
- public void deleteComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
- Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
- if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
- this.jobQueue.add(new DeleteJob(daoInfo, componentId, nodeTypeEnum, timestamp));
- }
- }
-
- /**
- * the method stores the given component in the cache
- *
- * @param component
- * componet to store in cache
- * @param nodeTypeEnum
- * the type of the component we want to store
- */
- @Override
- public void storeComponentInCache(org.openecomp.sdc.be.model.Component component, NodeTypeEnum nodeTypeEnum) {
- Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
- if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
- this.jobQueue.add(new StoreJob(daoInfo, component, nodeTypeEnum));
- }
- }
-
- /**
- * the method shutdown's all the worker's. the method has a pre set of how long it will wait for the workers to shutdown. the pre defined value is taken from the configuration.
- */
- @PreDestroy
- public void shutDown() {
- workerExecutor.shutdown();
- syncExecutor.shutdown();
- this.workerList.forEach(e -> e.shutDown());
- try {
- if (!workerExecutor.awaitTermination(this.waitOnShutDownInMinutes, TimeUnit.MINUTES)) {
- log.error("timer elapsed while waiting for Cache workers to finish, forcing a shutdown. ");
- }
- log.debug("all Cache workers finished");
- } catch (InterruptedException e) {
- log.error("failed while waiting for Cache worker", e);
- }
- try {
- if (!workerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
- log.error("timer elapsed while waiting for the Sync worker's to finish, forcing a shutdown. ");
- }
- log.debug("sync worker finished");
- } catch (InterruptedException e) {
- log.error("failed while waiting for sync worker", e);
- }
- }
-
- public TitanGenericDao getTitanGenericDao() {
- return titanGenericDao;
- }
-
- public ComponentCache getComponentCache() {
- return componentCache;
- }
+ @Autowired
+ private ToscaOperationFacade toscaOperationFacade;
+ @Autowired
+ private TitanGenericDao titanGenericDao;
+ @Autowired
+ private ComponentCache componentCache;
+
+ private static final Logger log = Logger.getLogger(CacheMangerOperation.class.getName());
+ private LinkedBlockingQueue<Job> jobQueue = null;
+ private int waitOnShutDownInMinutes;
+ private ScheduledExecutorService syncExecutor;
+ private ExecutorService workerExecutor;
+ private LinkedList<IWorker> workerList = new LinkedList<>();
+ private DaoInfo daoInfo;
+
+ /**
+ * constructor
+ */
+ public CacheMangerOperation() {
+ }
+
+ /**
+ * the method checks in the cache is enabled, if it is, it initializes all the workers according to the configuration values.
+ */
+ @PostConstruct
+ public void init() {
+
+ daoInfo = new DaoInfo(toscaOperationFacade, componentCache);
+
+ Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
+ if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
+ Integer numberOfWorkers = applicationL2CacheConfig.getQueue().getNumberOfCacheWorkers();
+ this.waitOnShutDownInMinutes = applicationL2CacheConfig.getQueue().getWaitOnShutDownInMinutes();
+ jobQueue = new LinkedBlockingQueue<>();
+ log.info("L2 Cache is enabled initializing queue");
+ log.debug("initializing SyncWorker, creating {} workers", numberOfWorkers);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Sync-Cache-Worker-%d").build();
+ this.syncExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ log.debug("initializing workers, creating {} cacheWorkers", numberOfWorkers);
+ threadFactory = new ThreadFactoryBuilder().setNameFormat("Cache-Worker-%d").build();
+ String workerName = "Sync-Worker";
+ Integer syncWorkerExacutionIntrval = applicationL2CacheConfig.getQueue().getSyncIntervalInSecondes();
+ log.debug("starting Sync worker:{} with executions interval:{} ", workerName, syncWorkerExacutionIntrval);
+ SyncWorker syncWorker = new SyncWorker(workerName, this);
+ this.syncExecutor.scheduleAtFixedRate(syncWorker, 5 * 60, syncWorkerExacutionIntrval, TimeUnit.SECONDS);
+ this.workerExecutor = Executors.newFixedThreadPool(numberOfWorkers, threadFactory);
+ CacheWorker cacheWorker;
+ for (int i = 0; i < numberOfWorkers; i++) {
+ workerName = "Cache-Worker-" + i;
+ log.debug("starting Cache worker:{}", workerName);
+ cacheWorker = new CacheWorker(workerName, jobQueue);
+ this.workerExecutor.submit(cacheWorker);
+ this.workerList.add(cacheWorker);
+ }
+ } else {
+ log.info("L2 Cache is disabled");
+ }
+ log.info("L2 Cache has been initialized and the workers are running");
+ }
+
+ /**
+ * the method creates a job to check it the given component is in the cach and if so is it valid if the value in the cache is not valid it will be updated.
+ *
+ * @param componentId
+ * the uid of the component we want to update
+ * @param timestamp
+ * the time of the component update
+ * @param nodeTypeEnum
+ * the type of the component resource/service/product
+ */
+ @Override
+ public void updateComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
+ Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
+ if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
+ this.jobQueue.add(new CheckAndUpdateJob(daoInfo, componentId, nodeTypeEnum, timestamp));
+ }
+ }
+
+ public void overideComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
+ Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
+ if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
+ this.jobQueue.add(new OverrideJob(daoInfo, componentId, nodeTypeEnum, timestamp));
+ }
+ }
+
+ public void deleteComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
+ Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
+ if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
+ this.jobQueue.add(new DeleteJob(daoInfo, componentId, nodeTypeEnum, timestamp));
+ }
+ }
+
+ /**
+ * the method stores the given component in the cache
+ *
+ * @param component
+ * componet to store in cache
+ * @param nodeTypeEnum
+ * the type of the component we want to store
+ */
+ @Override
+ public void storeComponentInCache(org.openecomp.sdc.be.model.Component component, NodeTypeEnum nodeTypeEnum) {
+ Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
+ if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
+ this.jobQueue.add(new StoreJob(daoInfo, component, nodeTypeEnum));
+ }
+ }
+
+ /**
+ * the method shutdown's all the worker's. the method has a pre set of how long it will wait for the workers to shutdown. the pre defined value is taken from the configuration.
+ */
+ @PreDestroy
+ public void shutDown() {
+ workerExecutor.shutdown();
+ syncExecutor.shutdown();
+ this.workerList.forEach(IWorker::shutDown);
+ try {
+ if (!workerExecutor.awaitTermination(this.waitOnShutDownInMinutes, TimeUnit.MINUTES)) {
+ log.error("timer elapsed while waiting for Cache workers to finish, forcing a shutdown. ");
+ }
+ log.debug("all Cache workers finished");
+ } catch (InterruptedException e) {
+ log.error("failed while waiting for Cache worker", e);
+ }
+ try {
+ if (!workerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
+ log.error("timer elapsed while waiting for the Sync worker's to finish, forcing a shutdown. ");
+ }
+ log.debug("sync worker finished");
+ } catch (InterruptedException e) {
+ log.error("failed while waiting for sync worker", e);
+ }
+ }
+
+ public TitanGenericDao getTitanGenericDao() {
+ return titanGenericDao;
+ }
+
+ public ComponentCache getComponentCache() {
+ return componentCache;
+ }
}