From e1735a6044b9a0b5ec8e3afede88744a7d57782a Mon Sep 17 00:00:00 2001 From: "ning.xi" Date: Tue, 25 Feb 2020 16:55:22 +0800 Subject: remove duplication code in plugins-event producer Issue-ID: POLICY-1884 Change-Id: I7774dfdc7d7edcc0409a817bfd21ddacec21d085 Signed-off-by: ning.xi --- .../event/carrier/kafka/ApexKafkaProducer.java | 42 ++---------- .../carrier/restclient/ApexRestClientProducer.java | 44 ++----------- .../restrequestor/ApexRestRequestorProducer.java | 43 ++----------- .../carrier/websocket/ApexWebSocketProducer.java | 42 ++---------- .../engine/event/ApexPluginsEventProducer.java | 74 ++++++++++++++++++++++ .../producer/ApexFileEventProducer.java | 50 ++------------- 6 files changed, 97 insertions(+), 198 deletions(-) create mode 100644 services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPluginsEventProducer.java diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java index 98ff61121..bb5398ceb 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +29,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.onap.policy.apex.service.engine.event.ApexEventException; -import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.engine.event.SynchronousEventCache; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; @@ -41,7 +42,7 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ApexKafkaProducer implements ApexEventProducer { +public class ApexKafkaProducer extends ApexPluginsEventProducer { // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaProducer.class); @@ -52,12 +53,6 @@ public class ApexKafkaProducer implements ApexEventProducer { // The Kafka Producer used to send events using Kafka private Producer kafkaProducer; - // The name for this producer - private String name = null; - - // The peer references for this event handler - private Map peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - @Override public void init(final String producerName, final EventHandlerParameters producerParameters) throws ApexEventException { @@ -73,42 +68,13 @@ public class ApexKafkaProducer implements ApexEventProducer { (KafkaCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); } - /** - * {@inheritDoc}. - */ - @Override - public String getName() { - return name; - } - - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - /** * {@inheritDoc}. */ @Override public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, final Object event) { - // Check if this is a synchronized event, if so we have received a reply - final SynchronousEventCache synchronousEventCache = - (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); - if (synchronousEventCache != null) { - synchronousEventCache.removeCachedEventToApexIfExists(executionId); - } + super.sendEvent(executionId, executionProperties, eventName, event); // Kafka producer must be started in the same thread as it is stopped, so we must start it here if (kafkaProducer == null) { diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientProducer.java index ea60e3915..3506ace3c 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientProducer.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.core.Response; import org.onap.policy.apex.service.engine.event.ApexEventException; -import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.engine.event.SynchronousEventCache; @@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory; * @author Joss Armstrong (joss.armstrong@ericsson.com) * */ -public class ApexRestClientProducer implements ApexEventProducer { +public class ApexRestClientProducer extends ApexPluginsEventProducer { private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestClientProducer.class); // The HTTP client that makes a REST call with an event from Apex @@ -57,12 +57,6 @@ public class ApexRestClientProducer implements ApexEventProducer { // The REST carrier properties private RestClientCarrierTechnologyParameters restProducerProperties; - // The name for this producer - private String name = null; - - // The peer references for this event handler - private Map peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - /** * {@inheritDoc}. */ @@ -103,39 +97,9 @@ public class ApexRestClientProducer implements ApexEventProducer { /** * {@inheritDoc}. */ - @Override - public String getName() { - return name; - } - - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - - /** - * {@inheritDoc}. - */ - @Override public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, final Object event) { - // Check if this is a synchronized event, if so we have received a reply - final SynchronousEventCache synchronousEventCache = - (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); - if (synchronousEventCache != null) { - synchronousEventCache.removeCachedEventToApexIfExists(executionId); - } + super.sendEvent(executionId, executionProperties, eventName, event); String untaggedUrl = restProducerProperties.getUrl(); if (executionProperties != null) { diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java index 59a9ac971..3e2cd5a94 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ import java.util.Properties; import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; -import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.engine.event.SynchronousEventCache; @@ -42,18 +42,12 @@ import org.slf4j.LoggerFactory; * @author Liam Fallon (liam.fallon@ericsson.com) * */ -public class ApexRestRequestorProducer implements ApexEventProducer { +public class ApexRestRequestorProducer extends ApexPluginsEventProducer { private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestRequestorProducer.class); // The REST carrier properties private RestRequestorCarrierTechnologyParameters restProducerProperties; - // The name for this producer - private String name = null; - - // The peer references for this event handler - private Map peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - // The number of events sent private int eventsSent = 0; @@ -100,14 +94,6 @@ public class ApexRestRequestorProducer implements ApexEventProducer { } } - /** - * {@inheritDoc}. - */ - @Override - public String getName() { - return name; - } - /** * Get the number of events sent to date. * @@ -117,34 +103,13 @@ public class ApexRestRequestorProducer implements ApexEventProducer { return eventsSent; } - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - /** * {@inheritDoc}. */ @Override public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, final Object event) { - // Check if this is a synchronized event, if so we have received a reply - final SynchronousEventCache synchronousEventCache = - (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); - if (synchronousEventCache != null) { - synchronousEventCache.removeCachedEventToApexIfExists(executionId); - } + super.sendEvent(executionId, executionProperties, eventName, event); // Find the peered consumer for this producer final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketProducer.java index 407d73705..422c91bc3 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketProducer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +31,7 @@ import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStri import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageServer; import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessager; import org.onap.policy.apex.service.engine.event.ApexEventException; -import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.engine.event.SynchronousEventCache; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; @@ -43,19 +44,13 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ApexWebSocketProducer implements ApexEventProducer, WsStringMessageListener { +public class ApexWebSocketProducer extends ApexPluginsEventProducer implements WsStringMessageListener { // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexWebSocketProducer.class); // The web socket messager, may be WS a server or a client private WsStringMessager wsStringMessager; - // The name for this producer - private String name = null; - - // The peer references for this event handler - private Map peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - @Override public void init(final String producerName, final EventHandlerParameters producerParameters) throws ApexEventException { @@ -90,42 +85,13 @@ public class ApexWebSocketProducer implements ApexEventProducer, WsStringMessage } } - /** - * {@inheritDoc}. - */ - @Override - public String getName() { - return name; - } - - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - /** * {@inheritDoc}. */ @Override public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, final Object event) { - // Check if this is a synchronized event, if so we have received a reply - final SynchronousEventCache synchronousEventCache = - (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); - if (synchronousEventCache != null) { - synchronousEventCache.removeCachedEventToApexIfExists(executionId); - } + super.sendEvent(executionId, executionProperties, eventName, event ); wsStringMessager.sendString((String) event); } diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPluginsEventProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPluginsEventProducer.java new file mode 100644 index 000000000..06c3d4163 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPluginsEventProducer.java @@ -0,0 +1,74 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2020 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; + +import java.util.EnumMap; +import java.util.Map; +import java.util.Properties; + +public abstract class ApexPluginsEventProducer implements ApexEventProducer { + // The name for this producer + protected String name = null; + // The peer references for this event handler + protected Map peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); + + /** + * {@inheritDoc}. + */ + @Override + public String getName() { + return name; + } + + /** + * {@inheritDoc}. + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /** + * {@inheritDoc}. + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /** + * {@inheritDoc}. + */ + @Override + public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, + final Object event) { + // Check if this is a synchronized event, if so we have received a reply + final SynchronousEventCache synchronousEventCache = + (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); + if (synchronousEventCache != null) { + synchronousEventCache.removeCachedEventToApexIfExists(executionId); + } + } + +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java index 5209f6453..8f21a4fb9 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ import java.util.Properties; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; import org.onap.policy.apex.service.engine.event.ApexEventException; -import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.engine.event.SynchronousEventCache; @@ -45,26 +45,19 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ApexFileEventProducer implements ApexEventProducer { +public class ApexFileEventProducer extends ApexPluginsEventProducer { // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventProducer.class); - // The name for this producer - private String producerName = null; - // The output stream to write events to private PrintStream eventOutputStream; - // The peer references for this event handler - private final Map peerReferenceMap = - new EnumMap<>(EventHandlerPeeredMode.class); - /** * {@inheritDoc}. */ @Override - public void init(final String name, final EventHandlerParameters producerParameters) throws ApexEventException { - producerName = name; + public void init(final String producerName, final EventHandlerParameters producerParameters) throws ApexEventException { + this.name = producerName; // Get and check the Apex parameters from the parameter service if (producerParameters == null) { @@ -105,42 +98,13 @@ public class ApexFileEventProducer implements ApexEventProducer { } } - /** - * {@inheritDoc}. - */ - @Override - public String getName() { - return producerName; - } - - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - /** * {@inheritDoc}. */ @Override public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, final Object event) { - // Check if this is a synchronized event, if so we have received a reply - final SynchronousEventCache synchronousEventCache = - (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); - if (synchronousEventCache != null) { - synchronousEventCache.removeCachedEventToApexIfExists(executionId); - } + super.sendEvent(executionId, executionProperties, eventName, event); // Cast the event to a string, if our conversion is correctly configured, this cast should // always work @@ -148,7 +112,7 @@ public class ApexFileEventProducer implements ApexEventProducer { try { stringEvent = (String) event; } catch (final Exception e) { - final String errorMessage = "error in ApexFileProducer \"" + producerName + "\" while transferring event \"" + final String errorMessage = "error in ApexFileProducer \"" + name + "\" while transferring event \"" + event + "\" to the output stream"; LOGGER.debug(errorMessage, e); throw new ApexEventRuntimeException(errorMessage, e); -- cgit 1.2.3-korg