diff options
8 files changed, 97 insertions, 321 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsConsumer.java index cbabab3b8..ff300423f 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsConsumer.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,6 @@ package org.onap.policy.apex.plugins.event.carrier.jms; -import java.util.EnumMap; -import java.util.Map; import java.util.Properties; import javax.jms.Connection; @@ -34,15 +32,12 @@ import javax.jms.Session; import javax.jms.Topic; import javax.naming.InitialContext; -import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; -import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; -import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; -import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +46,7 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ApexJmsConsumer implements MessageListener, ApexEventConsumer, Runnable { +public class ApexJmsConsumer extends ApexPluginsEventConsumer implements MessageListener { // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsConsumer.class); @@ -61,22 +56,12 @@ public class ApexJmsConsumer implements MessageListener, ApexEventConsumer, Runn // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; - // The consumer thread and stopping flag - private Thread consumerThread; - private boolean stopOrderedFlag = false; - // The connection to the JMS server private Connection connection; // The topic on which we receive events from JMS private Topic jmsIncomingTopic; - // The name for this consumer - private String name = null; - - // The peer references for this event handler - private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, final ApexEventReceiver incomingEventReceiver) throws ApexEventException { @@ -148,42 +133,6 @@ public class ApexJmsConsumer implements MessageListener, ApexEventConsumer, Runn * {@inheritDoc}. */ @Override - public void start() { - // Configure and start the event reception thread - final String threadName = this.getClass().getName() + ":" + this.name; - consumerThread = new ApplicationThreadFactory(threadName).newThread(this); - consumerThread.setDaemon(true); - consumerThread.start(); - } - - /** - * {@inheritDoc}. - */ - @Override - public String getName() { - return name; - } - - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - - /** - * {@inheritDoc}. - */ - @Override public void run() { // JMS session and message consumer for receiving messages try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index a99258a48..947dd5466 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,21 +21,16 @@ package org.onap.policy.apex.plugins.event.carrier.kafka; -import java.util.EnumMap; -import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; -import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; -import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; -import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +39,7 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { +public class ApexKafkaConsumer extends ApexPluginsEventConsumer { // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaConsumer.class); @@ -57,16 +52,6 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { // The Kafka consumer used to receive events using Kafka private KafkaConsumer<String, String> kafkaConsumer; - // The name for this consumer - private String name = null; - - // The peer references for this event handler - private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - - // The consumer thread and stopping flag - private Thread consumerThread; - private boolean stopOrderedFlag = false; - /** * {@inheritDoc}. */ @@ -97,41 +82,6 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { } } - /** - * {@inheritDoc}. - */ - @Override - public void start() { - // Configure and start the event reception thread - final String threadName = this.getClass().getName() + ":" + this.name; - consumerThread = new ApplicationThreadFactory(threadName).newThread(this); - consumerThread.setDaemon(true); - consumerThread.start(); - } - - /** - * {@inheritDoc}. - */ - @Override - public String getName() { - return name; - } - - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } /** * {@inheritDoc}. diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientConsumer.java index aaad52954..aa8185fdb 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientConsumer.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,6 @@ package org.onap.policy.apex.plugins.event.carrier.restclient; -import java.util.EnumMap; -import java.util.Map; import java.util.Properties; import java.util.regex.Matcher; @@ -32,15 +30,12 @@ import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; -import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; -import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; -import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; -import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +44,7 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ApexRestClientConsumer implements ApexEventConsumer, Runnable { +public class ApexRestClientConsumer extends ApexPluginsEventConsumer { // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestClientConsumer.class); @@ -68,19 +63,9 @@ public class ApexRestClientConsumer implements ApexEventConsumer, Runnable { // The HTTP client that makes a REST call to get an input event for Apex private Client client; - // The name for this consumer - private String name = null; - - // The peer references for this event handler - private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - // The pattern for filtering status code private Pattern httpCodeFilterPattern = null; - // The consumer thread and stopping flag - private Thread consumerThread; - private boolean stopOrderedFlag = false; - @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, final ApexEventReceiver incomingEventReceiver) throws ApexEventException { @@ -120,42 +105,6 @@ public class ApexRestClientConsumer implements ApexEventConsumer, Runnable { * {@inheritDoc}. */ @Override - public void start() { - // Configure and start the event reception thread - final String threadName = this.getClass().getName() + ":" + this.name; - consumerThread = new ApplicationThreadFactory(threadName).newThread(this); - consumerThread.setDaemon(true); - consumerThread.start(); - } - - /** - * {@inheritDoc}. - */ - @Override - public String getName() { - return name; - } - - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - - /** - * {@inheritDoc}. - */ - @Override public void run() { // The RequestRunner thread runs the get request for the event Thread requestRunnerThread = null; diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java index e382c0269..57560d2ef 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,6 @@ package org.onap.policy.apex.plugins.event.carrier.restrequestor; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; -import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -45,13 +44,11 @@ import javax.ws.rs.client.Invocation.Builder; import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; -import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; -import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; -import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; import org.slf4j.Logger; @@ -63,7 +60,7 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { +public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestRequestorConsumer.class); @@ -86,16 +83,6 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // The HTTP client that makes a REST call to get an input event for Apex private Client client; - // The name for this consumer - private String name = null; - - // The peer references for this event handler - private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - - // The consumer thread and stopping flag - private Thread consumerThread; - private boolean stopOrderedFlag = false; - // Temporary request holder for incoming REST requests private final BlockingQueue<ApexRestRequest> incomingRestRequestQueue = new LinkedBlockingQueue<>(); @@ -197,26 +184,6 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { } /** - * {@inheritDoc}. - */ - @Override - public void start() { - // Configure and start the event reception thread - final String threadName = this.getClass().getName() + ":" + this.name; - consumerThread = new ApplicationThreadFactory(threadName).newThread(this); - consumerThread.setDaemon(true); - consumerThread.start(); - } - - /** - * {@inheritDoc}. - */ - @Override - public String getName() { - return name; - } - - /** * Get the number of events received to date. * * @return the number of events received @@ -229,22 +196,6 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { * {@inheritDoc}. */ @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - - /** - * {@inheritDoc}. - */ - @Override public void run() { // The endless loop that receives events using REST calls while (consumerThread.isAlive() && !stopOrderedFlag) { diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumer.java index c8a07f234..95037160a 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumer.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,17 +21,13 @@ package org.onap.policy.apex.plugins.event.carrier.restserver; -import java.util.EnumMap; -import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; import javax.ws.rs.core.Response; -import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; -import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; -import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer; import org.onap.policy.apex.service.engine.event.SynchronousEventCache; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; @@ -46,7 +42,7 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ApexRestServerConsumer implements ApexEventConsumer, Runnable { +public class ApexRestServerConsumer extends ApexPluginsEventConsumer { // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestServerConsumer.class); @@ -56,16 +52,6 @@ public class ApexRestServerConsumer implements ApexEventConsumer, Runnable { // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; - // The name for this consumer - private String name = null; - - // The peer references for this event handler - private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - - // The consumer thread and stopping flag - private Thread consumerThread; - private boolean stopOrderedFlag = false; - // The local HTTP server to use for REST call reception if we are running a local Grizzly server private HttpServletServer server; @@ -156,42 +142,6 @@ public class ApexRestServerConsumer implements ApexEventConsumer, Runnable { } /** - * {@inheritDoc}. - */ - @Override - public void start() { - // Configure and start the event reception thread - final String threadName = this.getClass().getName() + ":" + this.name; - consumerThread = new ApplicationThreadFactory(threadName).newThread(this); - consumerThread.setDaemon(true); - consumerThread.start(); - } - - /** - * {@inheritDoc}. - */ - @Override - public String getName() { - return name; - } - - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - - /** * Receive an event for processing in Apex. * * @param event the event to receive diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/test/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/test/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumerTest.java index 07f705cc6..91d6a0f4e 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/test/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumerTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/test/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumerTest.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Samsung. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ import org.junit.Before; import org.junit.Test; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer; import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.engine.event.SynchronousEventCache; import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; @@ -156,7 +157,7 @@ public class ApexRestServerConsumerTest { Field field = ApexRestServerConsumer.class.getDeclaredField("eventReceiver"); field.setAccessible(true); field.set(apexRestServerConsumer, apexEventReceiver); - field = ApexRestServerConsumer.class.getDeclaredField("name"); + field = ApexPluginsEventConsumer.class.getDeclaredField("name"); field.setAccessible(true); field.set(apexRestServerConsumer, "TestApexRestServerConsumer"); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketConsumer.java index 3cf44806a..949cd53d7 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketConsumer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * * Modifications Copyright (C) 2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +21,6 @@ package org.onap.policy.apex.plugins.event.carrier.websocket; -import java.util.EnumMap; -import java.util.Map; import java.util.Properties; import org.onap.policy.apex.core.infrastructure.messaging.MessagingException; @@ -29,14 +28,11 @@ import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStri import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageListener; import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageServer; import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessager; -import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; -import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; -import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; -import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +41,7 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ApexWebSocketConsumer implements ApexEventConsumer, WsStringMessageListener, Runnable { +public class ApexWebSocketConsumer extends ApexPluginsEventConsumer implements WsStringMessageListener { private static final int WEB_SOCKET_WAIT_SLEEP_TIME = 100; // Get a reference to the logger @@ -57,16 +53,6 @@ public class ApexWebSocketConsumer implements ApexEventConsumer, WsStringMessage // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; - // The name for this consumer - private String name = null; - - // The peer references for this event handler - private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - - // The consumer thread and stopping flag - private Thread consumerThread; - private boolean stopOrderedFlag = false; - // The number of events read to date private int eventsRead = 0; @@ -107,42 +93,6 @@ public class ApexWebSocketConsumer implements ApexEventConsumer, WsStringMessage * {@inheritDoc}. */ @Override - public void start() { - // Configure and start the event reception thread - final String threadName = this.getClass().getName() + ":" + this.name; - consumerThread = new ApplicationThreadFactory(threadName).newThread(this); - consumerThread.setDaemon(true); - consumerThread.start(); - } - - /** - * {@inheritDoc}. - */ - @Override - public String getName() { - return name; - } - - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - - /** - * {@inheritDoc}. - */ - @Override public void run() { while (consumerThread.isAlive() && !stopOrderedFlag) { ThreadUtilities.sleep(WEB_SOCKET_WAIT_SLEEP_TIME); diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPluginsEventConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPluginsEventConsumer.java new file mode 100644 index 000000000..95a263e10 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPluginsEventConsumer.java @@ -0,0 +1,76 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2020 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import java.util.EnumMap; +import java.util.Map; +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; + +public abstract class ApexPluginsEventConsumer implements ApexEventConsumer, Runnable { + // The name for this consumer + protected String name = null; + + // The peer references for this event handler + protected Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = + new EnumMap<>(EventHandlerPeeredMode.class); + + // The consumer thread and stopping flag + protected Thread consumerThread; + protected boolean stopOrderedFlag = false; + + /** + * {@inheritDoc}. + */ + @Override + public void start() { + // Configure and start the event reception thread + final String threadName = this.getClass().getName() + ":" + this.name; + consumerThread = new ApplicationThreadFactory(threadName).newThread(this); + consumerThread.setDaemon(true); + consumerThread.start(); + } + + /** + * {@inheritDoc}. + */ + @Override + public String getName() { + return name; + } + + /** + * {@inheritDoc}. + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /** + * {@inheritDoc}. + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + +} |