diff options
Diffstat (limited to 'controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java')
-rw-r--r-- | controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java | 295 |
1 files changed, 295 insertions, 0 deletions
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java new file mode 100644 index 000000000..f7576f139 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java @@ -0,0 +1,295 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.ophistory; + +import java.util.Date; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Consumer; +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import org.eclipse.persistence.config.PersistenceUnitProperties; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.common.utils.jpa.EntityMgrCloser; +import org.onap.policy.common.utils.jpa.EntityTransCloser; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.VirtualControlLoopEvent; +import org.onap.policy.database.operationshistory.Dbao; +import org.onap.policy.guard.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Data manager that stores records in the DB, asynchronously, using a background thread. + */ +public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager { + private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class); + + /** + * Added to the end of {@link #operations} when {@link #stop()} is called. This is + * used to get the background thread out of a blocking wait for the next record. + */ + private static final Record END_MARKER = new Record(); + + // copied from the parameters + private final int maxQueueLength; + private final int batchSize; + + private final EntityManagerFactory emFactory; + + /** + * Thread that takes records from {@link #operations} and stores them in the DB. + */ + private Thread thread; + + /** + * Set to {@code true} to stop the background thread. + */ + private boolean stopped = false; + + /** + * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called, + * an {@link #END_MARKER} is added to the end of the queue. + */ + private final BlockingQueue<Record> operations = new LinkedBlockingQueue<>(); + + /** + * Number of records that have been added to the DB by this data manager instance. + */ + @Getter + private long recordsAdded = 0; + + + /** + * Constructs the object. + * + * @param params data manager parameters + */ + public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) { + ValidationResult result = params.validate("data-manager-properties"); + if (!result.isValid()) { + throw new IllegalArgumentException(result.getResult()); + } + + this.maxQueueLength = params.getMaxQueueLength(); + this.batchSize = params.getBatchSize(); + + // create the factory using the properties + Properties props = toProperties(params); + this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props); + } + + @Override + public synchronized void start() { + if (stopped || thread != null) { + // already started + return; + } + + thread = makeThread(emFactory, this::run); + thread.setDaemon(true); + thread.start(); + } + + @Override + public synchronized void stop() { + stopped = true; + + if (thread == null) { + // no thread to close the factory - do it here + emFactory.close(); + + } else { + // the thread will close the factory when it sees the end marker + operations.add(END_MARKER); + } + } + + @Override + public synchronized void store(String requestId, VirtualControlLoopEvent event, ControlLoopOperation operation) { + + if (stopped) { + logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId, + event, operation); + return; + } + + operations.add(new Record(requestId, event, operation)); + + if (operations.size() > maxQueueLength) { + Record discarded = operations.remove(); + logger.warn("too many items to store in the operation history table, discarding {}", discarded); + } + } + + /** + * Takes records from {@link #operations} and stores them in the queue. Continues to + * run until {@link #stop()} is invoked, or the thread is interrupted. + * + * @param emfactory entity manager factory + */ + private void run(EntityManagerFactory emfactory) { + try { + // store records until stopped, continuing if an exception occurs + while (!stopped) { + try { + Record triple = operations.take(); + storeBatch(emfactory.createEntityManager(), triple); + + } catch (RuntimeException e) { + logger.error("failed to save data to operation history table", e); + + } catch (InterruptedException e) { + logger.error("interrupted, discarding remaining operation history data", e); + Thread.currentThread().interrupt(); + return; + } + } + + storeRemainingRecords(emfactory); + + } finally { + synchronized (this) { + stopped = true; + } + + emfactory.close(); + } + } + + /** + * Store any remaining records, but stop at the first exception. + * + * @param emfactory entity manager factory + */ + private void storeRemainingRecords(EntityManagerFactory emfactory) { + try { + while (!operations.isEmpty()) { + storeBatch(emfactory.createEntityManager(), operations.poll()); + } + + } catch (RuntimeException e) { + logger.error("failed to save remaining data to operation history table", e); + } + } + + /** + * Stores a batch of records. + * + * @param entityManager entity manager + * @param firstRecord first record to be stored + */ + private void storeBatch(EntityManager entityManager, Record firstRecord) { + + try (EntityMgrCloser emc = new EntityMgrCloser(entityManager); + EntityTransCloser trans = new EntityTransCloser(entityManager.getTransaction())) { + + int nrecords = 0; + Record record = firstRecord; + + while (record != null && record != END_MARKER) { + storeRecord(entityManager, record); + + if (++nrecords >= batchSize) { + break; + } + + record = operations.poll(); + } + + trans.commit(); + recordsAdded += nrecords; + } + } + + /** + * Stores a record. + * + * @param entityManager entity manager + * @param record record to be stored + */ + private void storeRecord(EntityManager entityMgr, Record record) { + + Dbao newEntry = new Dbao(); + + final VirtualControlLoopEvent event = record.getEvent(); + final ControlLoopOperation operation = record.getOperation(); + + newEntry.setClosedLoopName(event.getClosedLoopControlName()); + newEntry.setRequestId(record.getRequestId()); + newEntry.setActor(operation.getActor()); + newEntry.setOperation(operation.getOperation()); + newEntry.setTarget(operation.getTarget()); + newEntry.setSubrequestId(operation.getSubRequestId()); + newEntry.setMessage(operation.getMessage()); + newEntry.setOutcome(operation.getOutcome()); + if (operation.getStart() != null) { + newEntry.setStarttime(new Date(operation.getStart().toEpochMilli())); + } + if (operation.getEnd() != null) { + newEntry.setEndtime(new Date(operation.getEnd().toEpochMilli())); + } + + entityMgr.persist(newEntry); + } + + /** + * Converts the parameters to Properties. + * + * @param params parameters to be converted + * @return a new property set + */ + private Properties toProperties(OperationHistoryDataManagerParams params) { + Properties props = new Properties(); + props.put(Util.ECLIPSE_LINK_KEY_URL, params.getUrl()); + props.put(Util.ECLIPSE_LINK_KEY_USER, params.getUserName()); + props.put(Util.ECLIPSE_LINK_KEY_PASS, params.getPassword()); + props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader()); + + return props; + } + + @Getter + @NoArgsConstructor + @AllArgsConstructor + @ToString + private static class Record { + private String requestId; + private VirtualControlLoopEvent event; + private ControlLoopOperation operation; + } + + // the following may be overridden by junit tests + + protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) { + return Persistence.createEntityManagerFactory(opsHistPu, props); + } + + protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) { + return new Thread(() -> command.accept(emfactory)); + } +} |