diff options
Diffstat (limited to 'services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java')
-rw-r--r-- | services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java | 674 |
1 files changed, 674 insertions, 0 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java new file mode 100644 index 000000000..20f8aaf75 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java @@ -0,0 +1,674 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.runtime.impl; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.BlockingQueue; + +import org.onap.policy.apex.context.ContextException; +import org.onap.policy.apex.context.ContextRuntimeException; +import org.onap.policy.apex.context.SchemaHelper; +import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory; +import org.onap.policy.apex.core.engine.engine.ApexEngine; +import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory; +import org.onap.policy.apex.core.engine.event.EnEvent; +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.onap.policy.apex.model.basicmodel.handling.ApexModelException; +import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader; +import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter; +import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums; +import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; +import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.onap.policy.apex.service.engine.event.ApexEvent; +import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter; +import org.onap.policy.apex.service.engine.runtime.ApexEventListener; +import org.onap.policy.apex.service.engine.runtime.EngineService; +import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +/** + * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies + * defined in the {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy + * is triggered by an Apex event, and when the policy is triggered it runs through to completion in + * the ApexEngine. + * + * This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it + * events, and receiving events from it. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +final class EngineWorker implements EngineService { + // Logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class); + + // The ID of this engine + private final AxArtifactKey engineWorkerKey; + + // The Apex engine which is running the policies in this worker + private final ApexEngine engine; + + // The event processor is an inner class, an instance of which runs as a thread that reads + // incoming events from a queue and forwards them to the Apex engine + private EventProcessor processor = null; + + // Thread handling for the worker + private final ApplicationThreadFactory threadFactory; + private Thread processorThread; + + // Converts ApexEvent instances to and from EnEvent instances + private ApexEvent2EnEventConverter apexEnEventConverter = null; + + /** + * Constructor that creates an Apex engine, an event processor for events to be sent to that + * engine, and an {@link ApexModelReader} instance to read Apex models using JAXB. + * + * @param engineWorkerKey the engine worker key + * @param queue the queue on which events for this Apex worker will come + * @param threadFactory the thread factory to use for creating the event processing thread + * @throws ApexException thrown on errors on worker instantiation + */ + EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue, + final ApplicationThreadFactory threadFactory) throws ApexException { + LOGGER.entry(engineWorkerKey); + + this.engineWorkerKey = engineWorkerKey; + this.threadFactory = threadFactory; + + // Create the Apex engine + engine = new ApexEngineFactory().createApexEngine(engineWorkerKey); + + // Create and run the event processor + processor = new EventProcessor(queue); + + // Set the Event converter up + apexEnEventConverter = new ApexEvent2EnEventConverter(engine); + + LOGGER.exit(); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. + * String, org.onap.policy.apex.service.engine.runtime.ApexEventListener) + */ + @Override + public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) { + // Sanity checks on the Apex model + if (engine == null) { + LOGGER.warn("listener registration on engine with key " + engineWorkerKey.getID() + + ", failed, listener is null"); + return; + } + + engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter)); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. + * String) + */ + @Override + public void deregisterActionListener(final String listenerName) { + // Sanity checks on the Apex model + if (engine == null) { + LOGGER.warn("listener deregistration on engine with key " + engineWorkerKey.getID() + + ", failed, listener is null"); + return; + } + + engine.removeEventListener(listenerName); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface() + */ + @Override + public EngineServiceEventInterface getEngineServiceEventInterface() { + throw new UnsupportedOperationException( + "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker"); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey() + */ + @Override + public AxArtifactKey getKey() { + return engineWorkerKey; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo() + */ + @Override + public Collection<AxArtifactKey> getEngineKeys() { + return Arrays.asList(engineWorkerKey); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey() + */ + @Override + public AxArtifactKey getApexModelKey() { + if (ModelService.existsModel(AxPolicyModel.class)) { + return ModelService.getModel(AxPolicyModel.class).getKey(); + } else { + return null; + } + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. + * model. basicmodel.concepts.AxArtifactKey, java.lang.String, boolean) + */ + @Override + public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag) + throws ApexException { + LOGGER.entry(engineKey); + + // Read the Apex model into memory using the Apex Model Reader + AxPolicyModel apexPolicyModel = null; + try { + final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class); + apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes())); + } catch (final ApexModelException e) { + LOGGER.error("failed to unmarshal the apex model on engine " + engineKey.getID(), e); + throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getID(), e); + } + + if (apexPolicyModel == null) { + LOGGER.error("apex model null on engine " + engineKey.getID()); + throw new ApexException("apex model null on engine " + engineKey.getID()); + } + + // Update the Apex model in the Apex engine + updateModel(engineKey, apexPolicyModel, forceFlag); + + LOGGER.exit(); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. + * model. basicmodel.concepts.AxArtifactKey, + * org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean) + */ + @Override + public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag) + throws ApexException { + LOGGER.entry(engineKey); + + // Check if the key on the update request is correct + if (!engineWorkerKey.equals(engineKey)) { + LOGGER.warn("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID() + + " of this engine"); + throw new ApexException("engine key " + engineKey.getID() + " does not match the key" + + engineWorkerKey.getID() + " of this engine"); + } + + // Sanity checks on the Apex model + if (engine == null) { + LOGGER.warn("engine with key " + engineKey.getID() + " not initialized"); + throw new ApexException("engine with key " + engineKey.getID() + " not initialized"); + } + + // Check model compatibility + if (ModelService.existsModel(AxPolicyModel.class)) { + // The current policy model may or may not be defined + final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class); + if (!currentModel.getKey().isCompatible(apexModel.getKey())) { + if (forceFlag) { + LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getID() + + "\" is not a compatible model update from the existing engine model with key \"" + + currentModel.getKey().getID() + "\""); + } else { + throw new ContextException( + "apex model update failed, supplied model with key \"" + apexModel.getKey().getID() + + "\" is not a compatible model update from the existing engine model with key \"" + + currentModel.getKey().getID() + "\""); + } + } + } + + // Update the Apex model in the Apex engine + engine.updateModel(apexModel); + + LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getID(), engineWorkerKey); + LOGGER.exit(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState() + */ + @Override + public AxEngineState getState() { + return engine.getState(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll() + */ + @Override + public void startAll() throws ApexException { + start(this.getKey()); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core. + * model. concepts.AxArtifactKey) + */ + @Override + public void start(final AxArtifactKey engineKey) throws ApexException { + LOGGER.entry(engineKey); + + // Check if the key on the start request is correct + if (!engineWorkerKey.equals(engineKey)) { + LOGGER.warn("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID() + + " of this engine"); + throw new ApexException("engine key " + engineKey.getID() + " does not match the key" + + engineWorkerKey.getID() + " of this engine"); + } + + if (engine == null) { + LOGGER.error("apex engine for engine key" + engineWorkerKey.getID() + " null"); + throw new ApexException("apex engine for engine key" + engineWorkerKey.getID() + " null"); + } + + // Starts the event processing thread that handles incoming events + if (processorThread != null && processorThread.isAlive()) { + LOGGER.error("apex engine for engine key" + engineWorkerKey.getID() + " is already running with state " + + getState()); + throw new ApexException("apex engine for engine key" + engineWorkerKey.getID() + + " is already running with state " + getState()); + } + + // Start the engine + engine.start(); + + // Start a thread to process events for the engine + processorThread = threadFactory.newThread(processor); + processorThread.start(); + + LOGGER.exit(engineKey); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop() + */ + @Override + public void stop() throws ApexException { + stop(this.getKey()); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core. + * model. concepts.AxArtifactKey) + */ + @Override + public void stop(final AxArtifactKey engineKey) throws ApexException { + // Check if the key on the start request is correct + if (!engineWorkerKey.equals(engineKey)) { + LOGGER.warn("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID() + + " of this engine"); + throw new ApexException("engine key " + engineKey.getID() + " does not match the key" + + engineWorkerKey.getID() + " of this engine"); + } + + if (engine == null) { + LOGGER.error("apex engine for engine key" + engineWorkerKey.getID() + " null"); + throw new ApexException("apex engine for engine key" + engineWorkerKey.getID() + " null"); + } + + // Interrupt the worker to stop its thread + if (processorThread == null || !processorThread.isAlive()) { + processorThread = null; + + LOGGER.warn("apex engine for engine key" + engineWorkerKey.getID() + " is already stopped with state " + + getState()); + return; + } + + // Interrupt the thread that is handling events toward the engine + processorThread.interrupt(); + + // Stop the engine + engine.stop(); + + LOGGER.exit(engineKey); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted() + */ + @Override + public boolean isStarted() { + return isStarted(this.getKey()); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex. + * model. basicmodel.concepts.AxArtifactKey) + */ + @Override + public boolean isStarted(final AxArtifactKey engineKey) { + final AxEngineState engstate = getState(); + switch (engstate) { + case STOPPED: + case STOPPING: + case UNDEFINED: + return false; + case EXECUTING: + case READY: + return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted(); + default: + break; + } + return false; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped() + */ + @Override + public boolean isStopped() { + return isStopped(this.getKey()); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex. + * model. basicmodel.concepts.AxArtifactKey) + */ + @Override + public boolean isStopped(final AxArtifactKey engineKey) { + final AxEngineState engstate = getState(); + switch (engstate) { + case STOPPING: + case UNDEFINED: + case EXECUTING: + case READY: + return false; + case STOPPED: + return processorThread == null || !processorThread.isAlive(); + default: + break; + } + return false; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long) + */ + @Override + public void startPeriodicEvents(final long period) { + throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker"); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents() + */ + @Override + public void stopPeriodicEvents() { + throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker"); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core + * .model .concepts.AxArtifactKey) + */ + @Override + public String getStatus(final AxArtifactKey engineKey) { + // Get the information from the engine that we want to return + final AxEngineModel apexEngineModel = engine.getEngineStatus(); + apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel); + + // Convert that information into a string + try { + final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream(); + final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class); + modelWriter.write(apexEngineModel, baOutputStream); + return baOutputStream.toString(); + } catch (final Exception e) { + LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e); + return null; + } + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex + * .core.model.concepts.AxArtifactKey) + */ + @Override + public String getRuntimeInfo(final AxArtifactKey engineKey) { + // We'll build up the JSON string for runtime information bit by bit + final StringBuilder runtimeJsonStringBuilder = new StringBuilder(); + + // Get the engine information + final AxEngineModel engineModel = engine.getEngineStatus(); + final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext(); + + // Use GSON to convert our context information into JSON + final Gson gson = new GsonBuilder().setPrettyPrinting().create(); + + // Get context into a JSON string + runtimeJsonStringBuilder.append("{\"TimeStamp\":"); + runtimeJsonStringBuilder.append(engineModel.getTimestamp()); + runtimeJsonStringBuilder.append(",\"State\":"); + runtimeJsonStringBuilder.append(engineModel.getState()); + runtimeJsonStringBuilder.append(",\"Stats\":"); + runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats())); + + // Get context into a JSON string + runtimeJsonStringBuilder.append(",\"ContextAlbums\":["); + + boolean firstAlbum = true; + for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) { + if (firstAlbum) { + firstAlbum = false; + } else { + runtimeJsonStringBuilder.append(","); + } + + runtimeJsonStringBuilder.append("{\"AlbumKey\":"); + runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey())); + runtimeJsonStringBuilder.append(",\"AlbumContent\":["); + + + // Get the schema helper to use to marshal context album objects to JSON + final AxContextAlbum axContextAlbum = + ModelService.getModel(AxContextAlbums.class).get(contextAlbumEntry.getKey()); + SchemaHelper schemaHelper = null; + + try { + // Get a schema helper to manage the translations between objects on the album map + // for this album + schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(), + axContextAlbum.getItemSchema()); + } catch (final ContextRuntimeException e) { + final String resultString = + "could not find schema helper to marshal context album \"" + axContextAlbum + "\" to JSON"; + LOGGER.warn(resultString, e); + + // End of context album entry + runtimeJsonStringBuilder.append(resultString); + runtimeJsonStringBuilder.append("]}"); + + continue; + } + + boolean firstEntry = true; + for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) { + if (firstEntry) { + firstEntry = false; + } else { + runtimeJsonStringBuilder.append(","); + } + runtimeJsonStringBuilder.append("{\"EntryName\":"); + runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey())); + runtimeJsonStringBuilder.append(",\"EntryContent\":"); + runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2Json(contextEntry.getValue()))); + + // End of context entry + runtimeJsonStringBuilder.append("}"); + } + + // End of context album entry + runtimeJsonStringBuilder.append("]}"); + } + + runtimeJsonStringBuilder.append("]}"); + + // Tidy up the JSON string + final JsonParser jsonParser = new JsonParser(); + final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString()); + final String tidiedRuntimeString = gson.toJson(jsonElement); + + LOGGER.debug("runtime information=" + tidiedRuntimeString); + + return tidiedRuntimeString; + } + + /** + * This is an event processor thread, this class decouples the events handling logic from core + * business logic. This class runs its own thread and continuously querying the blocking queue + * for the events that have been sent to the worker for processing by the Apex engine. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + private class EventProcessor implements Runnable { + private final boolean debugEnabled = LOGGER.isDebugEnabled(); + // the events queue + private BlockingQueue<ApexEvent> eventProcessingQueue = null; + + /** + * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects. + * + * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger + * events. + */ + EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) { + this.eventProcessingQueue = eventProcessingQueue; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + LOGGER.debug("Engine {} processing ... ", engineWorkerKey); + + // Take events from the event processing queue of the worker and pass them to the engine + // for processing + while (!processorThread.isInterrupted()) { + ApexEvent event = null; + try { + event = eventProcessingQueue.take(); + } catch (final InterruptedException e) { + LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey); + break; + } + + try { + if (event != null) { + if (debugEnabled) { + LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event); + } + final EnEvent enevent = apexEnEventConverter.fromApexEvent(event); + engine.handleEvent(enevent); + } + } catch (final ApexException e) { + LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e); + } catch (final Exception e) { + LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e); + break; + } + } + LOGGER.debug("Engine {} completed processing", engineWorkerKey); + } + } +} |