diff options
2 files changed, 69 insertions, 65 deletions
diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java index 9cbd2050f..fd5fe131f 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,7 @@ import org.slf4j.ext.XLoggerFactory; * @author Liam Fallon */ public class ApexEngineImpl implements ApexEngine { + // Logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEngineImpl.class); @@ -68,6 +69,7 @@ public class ApexEngineImpl implements ApexEngine { // The state of this engine private AxEngineState state = AxEngineState.STOPPED; + private final Object stateLockObj = new Object(); // call back listeners private final Map<String, EnEventListener> eventListeners = new LinkedHashMap<>(); @@ -89,7 +91,7 @@ public class ApexEngineImpl implements ApexEngine { protected ApexEngineImpl(final AxArtifactKey key) { argumentNotNull(key, "AxArtifactKey may not be null"); - LOGGER.entry("ApexEngine()->" + key.getId() + "," + state); + LOGGER.entry("ApexEngine()->{}, {}", key.getId(), state); this.key = key; @@ -106,16 +108,18 @@ public class ApexEngineImpl implements ApexEngine { @Override public void updateModel(final AxPolicyModel apexModel, final boolean isSubsequentInstance) throws ApexException { if (apexModel != null) { - LOGGER.entry("updateModel()->" + key.getId() + ", apexPolicyModel=" + apexModel.getKey().getId()); + LOGGER.entry("updateModel()->{}, apexPolicyModel {}", key.getId(), apexModel.getKey().getId()); } else { - LOGGER.warn(UPDATE_MODEL + key.getId() + ", Apex model not set"); throw new ApexException(UPDATE_MODEL + key.getId() + ", Apex model is not defined, it has a null value"); } // The engine must be stopped in order to do a model update - if (!state.equals(AxEngineState.STOPPED)) { - throw new ApexException(UPDATE_MODEL + key.getId() - + ", cannot update model, engine should be stopped but is in state " + state); + synchronized (stateLockObj) { + if (!state.equals(AxEngineState.STOPPED)) { + throw new ApexException( + UPDATE_MODEL + key.getId() + ", cannot update model, engine should be stopped but is in state " + + state); + } } // Create new internal context or update the existing one @@ -128,10 +132,8 @@ public class ApexEngineImpl implements ApexEngine { internalContext.update(apexModel, isSubsequentInstance); } } catch (final ContextException e) { - LOGGER.warn(UPDATE_MODEL + key.getId() + ", error setting the context for engine \"" + key.getId() + "\"", - e); - throw new ApexException(UPDATE_MODEL + key.getId() + ", error setting the context for engine \"" - + key.getId() + "\"", e); + throw new ApexException( + UPDATE_MODEL + key.getId() + ", error setting the context for engine \"" + key.getId() + "\"", e); } // Set up the state machines @@ -140,10 +142,8 @@ public class ApexEngineImpl implements ApexEngine { // always set up as new stateMachineHandler = new StateMachineHandler(internalContext); } catch (final StateMachineException e) { - LOGGER.warn(UPDATE_MODEL + key.getId() + ", error setting up the engine state machines \"" + key.getId() - + "\"", e); - throw new ApexException(UPDATE_MODEL + key.getId() + ", error setting up the engine state machines \"" - + key.getId() + "\"", e); + throw new ApexException( + UPDATE_MODEL + key.getId() + ", error setting up the engine state machines \"" + key.getId() + "\"", e); } LOGGER.exit(UPDATE_MODEL + key.getId()); @@ -154,20 +154,18 @@ public class ApexEngineImpl implements ApexEngine { */ @Override public void start() throws ApexException { - LOGGER.entry("start()" + key); - - if (state != AxEngineState.STOPPED) { - String message = START + key.getId() + "," + state + ", cannot start engine, engine not in state STOPPED"; - LOGGER.warn(message); - throw new ApexException(message); + LOGGER.entry("start() {}", key); + synchronized (stateLockObj) { + if (state != AxEngineState.STOPPED) { + String message = + START + key.getId() + "," + state + ", cannot start engine, engine not in state STOPPED"; + throw new ApexException(message); + } } if (stateMachineHandler == null || internalContext == null) { - String message = START + key.getId() + "," + state - + ", cannot start engine, engine has not been initialized, its model is not loaded"; - LOGGER.warn(message); throw new ApexException(START + key.getId() + "," + state - + ", cannot start engine, engine has not been initialized, its model is not loaded"); + + ", cannot start engine, engine has not been initialized, its model is not loaded"); } // Set up the state machines @@ -176,9 +174,8 @@ public class ApexEngineImpl implements ApexEngine { stateMachineHandler.start(); engineStats.engineStart(); } catch (final StateMachineException e) { - String message = UPDATE_MODEL + key.getId() + ", error starting the engine state machines \"" + key.getId() - + "\""; - LOGGER.warn(message, e); + String message = + UPDATE_MODEL + key.getId() + ", error starting the engine state machines \"" + key.getId() + "\""; throw new ApexException(message, e); } @@ -193,20 +190,21 @@ public class ApexEngineImpl implements ApexEngine { */ @Override public void stop() throws ApexException { - LOGGER.entry("stop()->" + key); + LOGGER.entry("stop()-> {}", key); // Check if the engine is already stopped - if (state == AxEngineState.STOPPED) { - throw new ApexException( - STOP + key.getId() + "," + state + ", cannot stop engine, engine is already stopped"); + synchronized (stateLockObj) { + if (state == AxEngineState.STOPPED) { + throw new ApexException( + STOP + key.getId() + "," + state + ", cannot stop engine, engine is already stopped"); + } } - // Stop the engine if it is in state READY, if it is in state EXECUTING, wait for execution to finish - for (int increment = ApexEngineConstants.STOP_EXECUTION_WAIT_TIMEOUT; - increment > 0; increment -= ApexEngineConstants.APEX_ENGINE_STOP_EXECUTION_WAIT_INCREMENT) { + for (int increment = ApexEngineConstants.STOP_EXECUTION_WAIT_TIMEOUT; increment > 0; + increment -= ApexEngineConstants.APEX_ENGINE_STOP_EXECUTION_WAIT_INCREMENT) { ThreadUtilities.sleep(ApexEngineConstants.APEX_ENGINE_STOP_EXECUTION_WAIT_INCREMENT); - synchronized (state) { + synchronized (stateLockObj) { switch (state) { // Engine is OK to stop or has been stopped on return of an event case READY: @@ -227,14 +225,14 @@ public class ApexEngineImpl implements ApexEngine { break; default: - throw new ApexException(STOP + key.getId() + "," + state - + ", cannot stop engine, engine is in an undefined state"); + throw new ApexException( + STOP + key.getId() + "," + state + ", cannot stop engine, engine is in an undefined state"); } } } // Force the engine to STOPPED state - synchronized (state) { + synchronized (stateLockObj) { state = AxEngineState.STOPPED; } @@ -246,10 +244,12 @@ public class ApexEngineImpl implements ApexEngine { */ @Override public void clear() throws ApexException { - LOGGER.entry("clear()->" + key); - if (state != AxEngineState.STOPPED) { - throw new ApexException("clear" + "()<-" + key.getId() + "," + state - + ", cannot clear engine, engine is not stopped"); + LOGGER.entry("clear()-> {}", key); + synchronized (stateLockObj) { + if (state != AxEngineState.STOPPED) { + throw new ApexException( + "clear" + "()<-" + key.getId() + "," + state + ", cannot clear engine, engine is not stopped"); + } } // Clear everything @@ -267,16 +267,18 @@ public class ApexEngineImpl implements ApexEngine { */ @Override public EnEvent createEvent(final AxArtifactKey eventKey) { - if (state != AxEngineState.READY && state != AxEngineState.EXECUTING) { - LOGGER.warn("createEvent()<-{},{}, cannot create event, engine not in state READY", key.getId(), state); - return null; + synchronized (stateLockObj) { + if (state != AxEngineState.READY && state != AxEngineState.EXECUTING) { + LOGGER.warn("createEvent()<-{},{}, cannot create event, engine not in state READY", key.getId(), state); + return null; + } } try { // Create an event using the internal context return new EnEvent(eventKey); } catch (final Exception e) { - LOGGER.warn("createEvent()<-" + key.getId() + "," + state + ", error on event creation", e); + LOGGER.warn("createEvent()<-{},{}, error on event creation: ", key.getId(), state, e); return null; } } @@ -292,7 +294,7 @@ public class ApexEngineImpl implements ApexEngine { return ret; } - synchronized (state) { + synchronized (stateLockObj) { if (state != AxEngineState.READY) { LOGGER.warn("handleEvent()<-{},{}, cannot run engine, engine not in state READY", key.getId(), state); return ret; @@ -323,17 +325,17 @@ public class ApexEngineImpl implements ApexEngine { synchronized (eventListeners) { if (eventListeners.isEmpty()) { LOGGER.debug("handleEvent()<-{},{}, There is no listener registered to recieve outgoing event: {}", - key.getId(), state, outgoingEvent); + key.getId(), state, outgoingEvent); } for (final EnEventListener axEventListener : eventListeners.values()) { axEventListener.onEnEvent(outgoingEvent); } } } catch (final ApexException e) { - LOGGER.warn("handleEvent()<-" + key.getId() + "," + state + ", outgoing event publishing error: ", e); + LOGGER.warn("handleEvent()<-{},{}, outgoing event publishing error: ", key.getId(), state, e); ret = false; } - synchronized (state) { + synchronized (stateLockObj) { // Only go to READY if we are still in state EXECUTING, we go to state STOPPED if we were STOPPING if (state == AxEngineState.EXECUTING) { state = AxEngineState.READY; @@ -351,13 +353,11 @@ public class ApexEngineImpl implements ApexEngine { public void addEventListener(final String listenerName, final EnEventListener listener) { if (listenerName == null) { String message = "addEventListener()<-" + key.getId() + "," + state + ", listenerName is null"; - LOGGER.warn(message); throw new ApexRuntimeException(message); } if (listener == null) { String message = "addEventListener()<-" + key.getId() + "," + state + ", listener is null"; - LOGGER.warn(message); throw new ApexRuntimeException(message); } @@ -371,7 +371,6 @@ public class ApexEngineImpl implements ApexEngine { public void removeEventListener(final String listenerName) { if (listenerName == null) { String message = "removeEventListener()<-" + key.getId() + "," + state + ", listenerName is null"; - LOGGER.warn(message); throw new ApexRuntimeException(message); } @@ -418,7 +417,7 @@ public class ApexEngineImpl implements ApexEngine { } for (final Entry<AxArtifactKey, ContextAlbum> contextAlbumEntry : internalContext.getContextAlbums() - .entrySet()) { + .entrySet()) { currentContext.put(contextAlbumEntry.getKey(), contextAlbumEntry.getValue()); } @@ -437,7 +436,7 @@ public class ApexEngineImpl implements ApexEngine { /** * Create an exception event from the incoming event including the exception information on the event. * - * @param incomingEvent The incoming event that caused the exception + * @param incomingEvent The incoming event that caused the exception * @param eventException The exception that was thrown * @return the exception event */ diff --git a/testsuites/performance/performance-benchmark-test/src/main/java/org/onap/policy/apex/testsuites/performance/benchmark/eventgenerator/EventGeneratorEndpoint.java b/testsuites/performance/performance-benchmark-test/src/main/java/org/onap/policy/apex/testsuites/performance/benchmark/eventgenerator/EventGeneratorEndpoint.java index ed624fb83..3d5adfc09 100644 --- a/testsuites/performance/performance-benchmark-test/src/main/java/org/onap/policy/apex/testsuites/performance/benchmark/eventgenerator/EventGeneratorEndpoint.java +++ b/testsuites/performance/performance-benchmark-test/src/main/java/org/onap/policy/apex/testsuites/performance/benchmark/eventgenerator/EventGeneratorEndpoint.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +25,7 @@ import com.google.gson.Gson; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import javax.inject.Inject; import javax.inject.Provider; import javax.ws.rs.GET; @@ -42,11 +44,13 @@ import org.slf4j.ext.XLoggerFactory; */ @Path("/") public class EventGeneratorEndpoint { + // Get a reference to the logger private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventGeneratorEndpoint.class); // Parameters for event generation - private static EventGeneratorParameters parameters = new EventGeneratorParameters(); + private static AtomicReference<EventGeneratorParameters> parameters = new AtomicReference<>( + new EventGeneratorParameters()); // The map of event batches sent in the test private static ConcurrentHashMap<Integer, EventBatch> batchMap = new ConcurrentHashMap<>(); @@ -59,6 +63,7 @@ public class EventGeneratorEndpoint { /** * Inject the HTTP request with a constructor. + * * @param httpRequest the current request */ @Inject @@ -72,9 +77,7 @@ public class EventGeneratorEndpoint { * @param incomingParameters the new parameters */ public static void setParameters(EventGeneratorParameters incomingParameters) { - synchronized (parameters) { - parameters = incomingParameters; - } + parameters.set(incomingParameters); } /** @@ -96,7 +99,7 @@ public class EventGeneratorEndpoint { @Path("/GetEvents") @GET public Response getEvents() { - ThreadUtilities.sleep(parameters.getDelayBetweenBatches()); + ThreadUtilities.sleep(parameters.get().getDelayBetweenBatches()); // Check if event generation is finished if (isFinished()) { @@ -104,12 +107,12 @@ public class EventGeneratorEndpoint { } // A batch count of 0 means to continue to handle events for ever - if (parameters.getBatchCount() > 0 && batchMap.size() >= parameters.getBatchCount()) { + if (parameters.get().getBatchCount() > 0 && batchMap.size() >= parameters.get().getBatchCount()) { setFinished(true); return Response.status(204).build(); } - EventBatch batch = new EventBatch(parameters.getBatchSize(), getApexClient()); + EventBatch batch = new EventBatch(parameters.get().getBatchSize(), getApexClient()); batchMap.put(batch.getBatchNumber(), batch); return Response.status(200).entity(batch.getBatchAsJsonString()).build(); @@ -144,12 +147,13 @@ public class EventGeneratorEndpoint { * @return the Apex client */ private String getApexClient() { - return httpRequest.get().getRemoteHost() + '(' + httpRequest.get().getRemoteAddr() + "):" - + httpRequest.get().getRemotePort(); + return httpRequest.get().getRemoteHost() + '(' + httpRequest.get().getRemoteAddr() + "):" + httpRequest.get() + .getRemotePort(); } /** * Get event generation statistics. + * * @return the statistics on event generation */ protected static String getEventGenerationStats() { @@ -165,6 +169,7 @@ public class EventGeneratorEndpoint { /** * Check if event generation has finished. + * * @return true if event generation has finished */ protected static boolean isFinished() { |