aboutsummaryrefslogtreecommitdiffstats
path: root/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java')
-rw-r--r--controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java295
1 files changed, 295 insertions, 0 deletions
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java
new file mode 100644
index 000000000..f7576f139
--- /dev/null
+++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java
@@ -0,0 +1,295 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.ophistory;
+
+import java.util.Date;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Consumer;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import org.eclipse.persistence.config.PersistenceUnitProperties;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.common.utils.jpa.EntityMgrCloser;
+import org.onap.policy.common.utils.jpa.EntityTransCloser;
+import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.VirtualControlLoopEvent;
+import org.onap.policy.database.operationshistory.Dbao;
+import org.onap.policy.guard.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Data manager that stores records in the DB, asynchronously, using a background thread.
+ */
+public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager {
+ private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class);
+
+ /**
+ * Added to the end of {@link #operations} when {@link #stop()} is called. This is
+ * used to get the background thread out of a blocking wait for the next record.
+ */
+ private static final Record END_MARKER = new Record();
+
+ // copied from the parameters
+ private final int maxQueueLength;
+ private final int batchSize;
+
+ private final EntityManagerFactory emFactory;
+
+ /**
+ * Thread that takes records from {@link #operations} and stores them in the DB.
+ */
+ private Thread thread;
+
+ /**
+ * Set to {@code true} to stop the background thread.
+ */
+ private boolean stopped = false;
+
+ /**
+ * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called,
+ * an {@link #END_MARKER} is added to the end of the queue.
+ */
+ private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>();
+
+ /**
+ * Number of records that have been added to the DB by this data manager instance.
+ */
+ @Getter
+ private long recordsAdded = 0;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param params data manager parameters
+ */
+ public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) {
+ ValidationResult result = params.validate("data-manager-properties");
+ if (!result.isValid()) {
+ throw new IllegalArgumentException(result.getResult());
+ }
+
+ this.maxQueueLength = params.getMaxQueueLength();
+ this.batchSize = params.getBatchSize();
+
+ // create the factory using the properties
+ Properties props = toProperties(params);
+ this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props);
+ }
+
+ @Override
+ public synchronized void start() {
+ if (stopped || thread != null) {
+ // already started
+ return;
+ }
+
+ thread = makeThread(emFactory, this::run);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ stopped = true;
+
+ if (thread == null) {
+ // no thread to close the factory - do it here
+ emFactory.close();
+
+ } else {
+ // the thread will close the factory when it sees the end marker
+ operations.add(END_MARKER);
+ }
+ }
+
+ @Override
+ public synchronized void store(String requestId, VirtualControlLoopEvent event, ControlLoopOperation operation) {
+
+ if (stopped) {
+ logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId,
+ event, operation);
+ return;
+ }
+
+ operations.add(new Record(requestId, event, operation));
+
+ if (operations.size() > maxQueueLength) {
+ Record discarded = operations.remove();
+ logger.warn("too many items to store in the operation history table, discarding {}", discarded);
+ }
+ }
+
+ /**
+ * Takes records from {@link #operations} and stores them in the queue. Continues to
+ * run until {@link #stop()} is invoked, or the thread is interrupted.
+ *
+ * @param emfactory entity manager factory
+ */
+ private void run(EntityManagerFactory emfactory) {
+ try {
+ // store records until stopped, continuing if an exception occurs
+ while (!stopped) {
+ try {
+ Record triple = operations.take();
+ storeBatch(emfactory.createEntityManager(), triple);
+
+ } catch (RuntimeException e) {
+ logger.error("failed to save data to operation history table", e);
+
+ } catch (InterruptedException e) {
+ logger.error("interrupted, discarding remaining operation history data", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+
+ storeRemainingRecords(emfactory);
+
+ } finally {
+ synchronized (this) {
+ stopped = true;
+ }
+
+ emfactory.close();
+ }
+ }
+
+ /**
+ * Store any remaining records, but stop at the first exception.
+ *
+ * @param emfactory entity manager factory
+ */
+ private void storeRemainingRecords(EntityManagerFactory emfactory) {
+ try {
+ while (!operations.isEmpty()) {
+ storeBatch(emfactory.createEntityManager(), operations.poll());
+ }
+
+ } catch (RuntimeException e) {
+ logger.error("failed to save remaining data to operation history table", e);
+ }
+ }
+
+ /**
+ * Stores a batch of records.
+ *
+ * @param entityManager entity manager
+ * @param firstRecord first record to be stored
+ */
+ private void storeBatch(EntityManager entityManager, Record firstRecord) {
+
+ try (EntityMgrCloser emc = new EntityMgrCloser(entityManager);
+ EntityTransCloser trans = new EntityTransCloser(entityManager.getTransaction())) {
+
+ int nrecords = 0;
+ Record record = firstRecord;
+
+ while (record != null && record != END_MARKER) {
+ storeRecord(entityManager, record);
+
+ if (++nrecords >= batchSize) {
+ break;
+ }
+
+ record = operations.poll();
+ }
+
+ trans.commit();
+ recordsAdded += nrecords;
+ }
+ }
+
+ /**
+ * Stores a record.
+ *
+ * @param entityManager entity manager
+ * @param record record to be stored
+ */
+ private void storeRecord(EntityManager entityMgr, Record record) {
+
+ Dbao newEntry = new Dbao();
+
+ final VirtualControlLoopEvent event = record.getEvent();
+ final ControlLoopOperation operation = record.getOperation();
+
+ newEntry.setClosedLoopName(event.getClosedLoopControlName());
+ newEntry.setRequestId(record.getRequestId());
+ newEntry.setActor(operation.getActor());
+ newEntry.setOperation(operation.getOperation());
+ newEntry.setTarget(operation.getTarget());
+ newEntry.setSubrequestId(operation.getSubRequestId());
+ newEntry.setMessage(operation.getMessage());
+ newEntry.setOutcome(operation.getOutcome());
+ if (operation.getStart() != null) {
+ newEntry.setStarttime(new Date(operation.getStart().toEpochMilli()));
+ }
+ if (operation.getEnd() != null) {
+ newEntry.setEndtime(new Date(operation.getEnd().toEpochMilli()));
+ }
+
+ entityMgr.persist(newEntry);
+ }
+
+ /**
+ * Converts the parameters to Properties.
+ *
+ * @param params parameters to be converted
+ * @return a new property set
+ */
+ private Properties toProperties(OperationHistoryDataManagerParams params) {
+ Properties props = new Properties();
+ props.put(Util.ECLIPSE_LINK_KEY_URL, params.getUrl());
+ props.put(Util.ECLIPSE_LINK_KEY_USER, params.getUserName());
+ props.put(Util.ECLIPSE_LINK_KEY_PASS, params.getPassword());
+ props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader());
+
+ return props;
+ }
+
+ @Getter
+ @NoArgsConstructor
+ @AllArgsConstructor
+ @ToString
+ private static class Record {
+ private String requestId;
+ private VirtualControlLoopEvent event;
+ private ControlLoopOperation operation;
+ }
+
+ // the following may be overridden by junit tests
+
+ protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
+ return Persistence.createEntityManagerFactory(opsHistPu, props);
+ }
+
+ protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
+ return new Thread(() -> command.accept(emfactory));
+ }
+}