diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main')
17 files changed, 83 insertions, 1393 deletions
diff --git a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties index 2786a613..59c4b472 100644 --- a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties +++ b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties @@ -2,14 +2,14 @@ # ============LICENSE_START======================================================= # feature-pooling-dmaap # ================================================================================ -# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -65,34 +65,23 @@ # Topic used for inter-host communication for a particular controller # pooling.<controller-name>.topic=XXX -# These specify how the request id is to be extracted from each type of -# object that may be presented to a controller from shared topics -# (i.e., topics where hosts do not all receive a copy of the event) -extractor.requestId.org.onap.policy.controlloop.VirtualControlLoopEvent=${requestId} - - # Each controller that is enabled should have its own topic and the -# corresponding ueb.xxx properties. However, for now, just assume that -# the amsterdam-cl and beijing-cl features will not both be enabled -# at the same time. - -pooling.amsterdam.enabled=true -pooling.amsterdam.topic=${env:POOLING_TOPIC} - -pooling.beijing.enabled=true -pooling.beijing.topic=${env:POOLING_TOPIC} +# corresponding dmaap.xxx properties. However, for now, just assume that +# the usecases features will not both be enabled at the same time. +pooling.usecases.enabled=true +pooling.usecases.topic=${env:POOLING_TOPIC} # the list of sources and sinks should be identical -ueb.source.topics=POOLING_TOPIC -ueb.sink.topics=POOLING_TOPIC - -ueb.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS} -ueb.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC} -ueb.source.topics.POOLING_TOPIC.apiKey= -ueb.source.topics.POOLING_TOPIC.apiSecret= - -ueb.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS} -ueb.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC} -ueb.sink.topics.POOLING_TOPIC.apiKey= -ueb.sink.topics.POOLING_TOPIC.apiSecret= +dmaap.source.topics=POOLING_TOPIC +dmaap.sink.topics=POOLING_TOPIC + +dmaap.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS} +dmaap.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC} +dmaap.source.topics.POOLING_TOPIC.apiKey= +dmaap.source.topics.POOLING_TOPIC.apiSecret= + +dmaap.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS} +dmaap.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC} +dmaap.sink.topics.POOLING_TOPIC.apiKey= +dmaap.sink.topics.POOLING_TOPIC.apiSecret= diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java index 9ba844ed..08c82fea 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ package org.onap.policy.drools.pooling; import java.util.List; -import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicListener; @@ -46,7 +45,7 @@ public class DmaapManager { /** * Topic source whose filter is to be manipulated. */ - private final FilterableTopicSource topicSource; + private final TopicSource topicSource; /** * Where to publish messages. @@ -79,9 +78,6 @@ public class DmaapManager { this.topicSource = findTopicSource(); this.topicSink = findTopicSink(); - // verify that we can set the filter - setFilter(null); - } catch (IllegalArgumentException e) { logger.error("failed to attach to topic {}", topic); throw new PoolingFeatureException(e); @@ -98,15 +94,10 @@ public class DmaapManager { * @return the topic source * @throws PoolingFeatureException if the source doesn't exist or is not filterable */ - private FilterableTopicSource findTopicSource() throws PoolingFeatureException { + private TopicSource findTopicSource() throws PoolingFeatureException { for (TopicSource src : getTopicSources()) { if (topic.equals(src.getTopic())) { - if (src instanceof FilterableTopicSource) { - return (FilterableTopicSource) src; - - } else { - throw new PoolingFeatureException("topic source " + topic + " is not filterable"); - } + return src; } } @@ -199,22 +190,6 @@ public class DmaapManager { } /** - * Sets the server-side filter to be used by the consumer. - * - * @param filter the filter string, or {@code null} if no filter is to be used - * @throws PoolingFeatureException if the topic is not filterable - */ - public void setFilter(String filter) throws PoolingFeatureException { - try { - logger.debug("change filter for topic {} to {}", topic, filter); - topicSource.setFilter(filter); - - } catch (UnsupportedOperationException e) { - throw new PoolingFeatureException("cannot filter topic " + topic, e); - } - } - - /** * Publishes a message to the sink. * * @param msg message to be published diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index 00b7a215..c7574925 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -77,11 +77,11 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF private final CountDownLatch activeLatch = new CountDownLatch(1); /** - * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is called - * later. As multiple threads can be active within the methods at the same time, we must keep - * this in thread local storage. + * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is + * called later. As multiple threads can be active within the methods at the same + * time, we must keep this in thread local storage. */ - private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>(); + private ThreadLocal<String> offerTopics = new ThreadLocal<>(); /** * Constructor. @@ -236,19 +236,19 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF return false; } - if (mgr.beforeOffer(protocol, topic2, event)) { + if (mgr.beforeOffer(topic2, event)) { return true; } - offerArgs.set(new OfferArgs(protocol, topic2, event)); + offerTopics.set(topic2); return false; } @Override public boolean beforeInsert(DroolsController droolsController, Object fact) { - OfferArgs args = offerArgs.get(); - if (args == null) { + String topic = offerTopics.get(); + if (topic == null) { logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert"); return false; } @@ -279,7 +279,7 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF return false; } - return mgr.beforeInsert(args.protocol, args.topic, args.event, fact); + return mgr.beforeInsert(topic, fact); } @Override @@ -287,7 +287,7 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF boolean success) { // clear any stored arguments - offerArgs.remove(); + offerTopics.remove(); return false; } @@ -345,40 +345,6 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException; } - /** - * Arguments captured from beforeOffer(). - */ - private static class OfferArgs { - - /** - * Protocol of the receiving topic. - */ - private CommInfrastructure protocol; - - /** - * Topic on which the event was received. - */ - private String topic; - - /** - * The event text that was received on the topic. - */ - private String event; - - /** - * Constructor. - * - * @param protocol protocol - * @param topic topic - * @param event the actual event data received on the topic - */ - public OfferArgs(CommInfrastructure protocol, String topic, String event) { - this.protocol = protocol; - this.topic = topic; - this.event = event; - } - } - /* * The remaining methods may be overridden by junit tests. */ diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java index e3576b8f..cc8e3a4d 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ package org.onap.policy.drools.pooling; import org.onap.policy.drools.pooling.message.BucketAssignments; -import org.onap.policy.drools.pooling.message.Forward; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.state.State; import org.onap.policy.drools.pooling.state.StateTimerTask; @@ -84,13 +83,6 @@ public interface PoolingManager { void publish(String channel, Message msg); /** - * Handles a {@link Forward} event that was received from the internal topic. - * - * @param event event - */ - void handle(Forward event); - - /** * Schedules a timer to fire after a delay. * * @param delayMs delay, in milliseconds diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index b2966101..9093c7c0 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,19 +23,14 @@ package org.onap.policy.drools.pooling; import com.google.gson.JsonParseException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Map; -import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicListener; -import org.onap.policy.common.utils.properties.SpecProperties; import org.onap.policy.drools.controller.DroolsController; -import org.onap.policy.drools.pooling.extractor.ClassExtractors; import org.onap.policy.drools.pooling.message.BucketAssignments; -import org.onap.policy.drools.pooling.message.Forward; import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; @@ -83,11 +78,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private final PolicyController controller; /** - * Where to offer events that have been forwarded to this host (i.e, the controller). - */ - private final TopicListener listener; - - /** * Decremented each time the manager enters the Active state. Used by junit tests. */ private final CountDownLatch activeLatch; @@ -108,11 +98,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private final DmaapManager dmaapMgr; /** - * Used to extract the request id from the decoded message. - */ - private final ClassExtractors extractors; - - /** * Lock used while updating {@link #current}. In general, public methods must use * this, while private methods assume the lock is already held. */ @@ -139,12 +124,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private ScheduledThreadPoolExecutor scheduler = null; /** - * {@code True} if events offered by the controller should be intercepted, - * {@code false} otherwise. - */ - private boolean intercept = true; - - /** * Constructs the manager, initializing all of the data structures. * * @param host name/uuid of this host @@ -161,10 +140,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { this.activeLatch = activeLatch; try { - this.listener = (TopicListener) controller; this.serializer = new Serializer(); this.topic = props.getPoolingTopic(); - this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource())); this.dmaapMgr = makeDmaapManager(props.getPoolingTopic()); this.current = new IdleState(this); @@ -207,17 +184,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Makes properties for configuring extractors. - * - * @param controller the controller for which the extractors will be configured - * @param source properties from which to get the extractor properties - * @return extractor properties - */ - private Properties makeExtractorProps(PolicyController controller, Properties source) { - return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source); - } - - /** * Indicates that the controller is about to start. Starts the publisher for the * internal topic, and creates a thread pool for the timers. */ @@ -332,29 +298,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { current.cancelTimers(); current = newState; - // set the filter before starting the state - setFilter(newState.getFilter()); newState.start(); } } - /** - * Sets the server-side filter for the internal topic. - * - * @param filter new filter to be used - */ - private void setFilter(Map<String, Object> filter) { - try { - dmaapMgr.setFilter(serializer.encodeFilter(filter)); - - } catch (JsonParseException e) { - logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e); - - } catch (PoolingFeatureException e) { - logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e); - } - } - @Override public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) { // wrap the task in a TimerAction and schedule it @@ -430,112 +377,91 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle * it instead, as it already has the decoded message. * - * @param protocol protocol * @param topic2 topic * @param event event * @return {@code true} if the event was handled by the manager, {@code false} if it * must still be handled by the invoker */ - public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) { + public boolean beforeOffer(String topic2, String event) { - if (!controller.isLocked() || !intercept) { + if (!controller.isLocked()) { // we should NOT intercept this message - let the invoker handle it return false; } - return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event))); + return handleExternal(topic2, decodeEvent(topic2, event)); } /** * Called by the DroolsController before it inserts the event into the rule engine. * - * @param protocol protocol * @param topic2 topic - * @param event original event text, as received from the Bus - * @param event2 event, as an object + * @param event event, as an object * @return {@code true} if the event was handled by the manager, {@code false} if it * must still be handled by the invoker */ - public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) { - - if (!intercept) { - // we should NOT intercept this message - let the invoker handle it - return false; - } - - return handleExternal(protocol, topic2, event, extractRequestId(event2)); + public boolean beforeInsert(String topic2, Object event) { + return handleExternal(topic2, event); } /** * Handles an event from an external topic. * - * @param protocol protocol * @param topic2 topic - * @param event event - * @param reqid request id extracted from the event, or {@code null} if it couldn't be - * extracted + * @param event event, as an object, or {@code null} if it cannot be decoded * @return {@code true} if the event was handled by the manager, {@code false} if it * must still be handled by the invoker */ - private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) { - if (reqid == null) { - // no request id - let the invoker handle it - return false; - } - - if (reqid.isEmpty()) { - logger.warn("handle locally due to empty request id for topic {}", topic2); - // no request id - let the invoker handle it + private boolean handleExternal(String topic2, Object event) { + if (event == null) { + // no event - let the invoker handle it return false; } - Forward ev = makeForward(protocol, topic2, event, reqid); - if (ev == null) { - // invalid args - consume the message - logger.warn("constructed an invalid Forward message on topic {}", getTopic()); - return true; - } - synchronized (curLocker) { - return handleExternal(ev); + return handleExternal(topic2, event, event.hashCode()); } } /** * Handles an event from an external topic. * - * @param event event + * @param topic2 topic + * @param event event, as an object + * @param eventHashCode event's hash code * @return {@code true} if the event was handled, {@code false} if the invoker should * handle it */ - private boolean handleExternal(Forward event) { + private boolean handleExternal(String topic2, Object event, int eventHashCode) { if (assignments == null) { // no bucket assignments yet - handle locally - logger.info("handle event locally for request {}", event.getRequestId()); + logger.info("handle event locally for request {}", event); // we did NOT consume the event return false; } else { - return handleEvent(event); + return handleEvent(topic2, event, eventHashCode); } } /** * Handles a {@link Forward} event, possibly forwarding it again. * - * @param event event + * @param topic2 topic + * @param event event, as an object + * @param eventHashCode event's hash code * @return {@code true} if the event was handled, {@code false} if the invoker should * handle it */ - private boolean handleEvent(Forward event) { - String target = assignments.getAssignedHost(event.getRequestId().hashCode()); + private boolean handleEvent(String topic2, Object event, int eventHashCode) { + String target = assignments.getAssignedHost(eventHashCode); if (target == null) { /* * This bucket has no assignment - just discard the event */ - logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic()); + logger.warn("discarded event for unassigned bucket from topic {}", topic2); return true; } @@ -543,41 +469,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /* * Message belongs to this host - allow the controller to handle it. */ - logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic()); + logger.info("handle local event for request {} from topic {}", event, topic2); return false; } - // forward to a different host, if hop count has been exhausted - if (event.getNumHops() > MAX_HOPS) { - logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS, - topic); - - } else { - logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic()); - event.bumpNumHops(); - publish(target, event); - } - - // either way, consume the event + // not our message, consume the event + logger.warn("discarded event for host {} from topic {}", target, topic2); return true; } /** - * Extract the request id from an event object. - * - * @param event the event object, or {@code null} - * @return the event's request id, or {@code null} if it can't be extracted - */ - private String extractRequestId(Object event) { - if (event == null) { - return null; - } - - Object reqid = extractors.extract(event); - return (reqid != null ? reqid.toString() : null); - } - - /** * Decodes an event from a String into an event Object. * * @param topic2 topic @@ -608,59 +509,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Makes a {@link Forward}, and validates its contents. - * - * @param protocol protocol - * @param topic2 topic - * @param event event - * @param reqid request id - * @return a new message, or {@code null} if the message was invalid - */ - private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) { - try { - Forward ev = new Forward(host, protocol, topic2, event, reqid); - - // required for the validity check - ev.setChannel(host); - - ev.checkValidity(); - - return ev; - - } catch (PoolingFeatureException e) { - logger.error("invalid message for topic {}", topic2, e); - return null; - } - } - - @Override - public void handle(Forward event) { - synchronized (curLocker) { - if (!handleExternal(event)) { - // this host should handle it - inject it - inject(event); - } - } - } - - /** - * Injects an event into the controller. - * - * @param event event - */ - private void inject(Forward event) { - logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic()); - - try { - intercept = false; - listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload()); - - } finally { - intercept = true; - } - } - - /** * Handles an event from the internal topic. This uses reflection to identify the * appropriate process() method to invoke, based on the type of Message that was * decoded. @@ -764,21 +612,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } } - /* - * The remaining methods may be overridden by junit tests. - */ - - /** - * Creates object extractors. - * - * @param props properties used to configure the extractors - * @return a new set of extractors - */ - protected ClassExtractors makeClassExtractors(Properties props) { - return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX, - PoolingProperties.EXTRACTOR_TYPE); - } - /** * Creates a DMaaP manager. * diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java index 80eee501..4627b9eb 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,7 +25,6 @@ import com.google.gson.JsonElement; import com.google.gson.JsonParseException; import java.util.HashMap; import java.util.Map; -import org.onap.policy.drools.pooling.message.Forward; import org.onap.policy.drools.pooling.message.Heartbeat; import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; @@ -60,7 +59,6 @@ public class Serializer { private static final Map<String, Class<? extends Message>> type2class = new HashMap<>(); static { - class2type.put(Forward.class, "forward"); class2type.put(Heartbeat.class, "heartbeat"); class2type.put(Identification.class, "identification"); class2type.put(Leader.class, "leader"); @@ -79,7 +77,7 @@ public class Serializer { /** * Encodes a filter. - * + * * @param filter filter to be encoded * @return the filter, serialized as a JSON string */ @@ -89,7 +87,7 @@ public class Serializer { /** * Encodes a message. - * + * * @param msg message to be encoded * @return the message, serialized as a JSON string */ @@ -108,7 +106,7 @@ public class Serializer { /** * Decodes a JSON string into a Message. - * + * * @param msg JSON string representing the message * @return the message */ diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java deleted file mode 100644 index bd75995f..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java +++ /dev/null @@ -1,458 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019-2020 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling.extractor; - -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Extractors for each object class. Properties define how the data is to be - * extracted for a given class, where the properties are similar to the - * following: - * - * <pre> - * <code><a.prefix>.<class.name> = ${event.reqid}</code> - * </pre> - * - * <p>For any given field name (e.g., "reqid"), it first looks for a public "getXxx()" - * method to extract the specified field. If that fails, then it looks for a public field - * by the given name. If that also fails, and the object is a <i>Map</i> subclass, then it - * simply uses the "get(field-name)" method to extract the data from the map. - */ -public class ClassExtractors { - - private static final Logger logger = LoggerFactory.getLogger(ClassExtractors.class); - - /** - * Properties that specify how the data is to be extracted from a given - * class. - */ - private final Properties properties; - - /** - * Property prefix, including a trailing ".". - */ - private final String prefix; - - /** - * Type of item to be extracted. - */ - private final String type; - - /** - * Maps the class name to its extractor. - */ - private final ConcurrentHashMap<String, Extractor> class2extractor = new ConcurrentHashMap<>(); - - /** - * Constructor. - * - * @param props properties that specify how the data is to be extracted from - * a given class - * @param prefix property name prefix, prepended before the class name - * @param type type of item to be extracted - */ - public ClassExtractors(Properties props, String prefix, String type) { - this.properties = props; - this.prefix = (prefix.endsWith(".") ? prefix : prefix + "."); - this.type = type; - } - - /** - * Gets the number of extractors in the map. - * - * @return gets the number of extractors in the map - */ - protected int size() { - return class2extractor.size(); - } - - /** - * Extracts the desired data item from an object. - * - * @param object object from which to extract the data item - * @return the extracted item, or {@code null} if it could not be extracted - */ - public Object extract(Object object) { - if (object == null) { - return null; - } - - Extractor ext = getExtractor(object); - - return ext.extract(object); - } - - /** - * Gets the extractor for the given type of object, creating one if it - * doesn't exist yet. - * - * @param object object whose extracted is desired - * @return an extractor for the object - */ - private Extractor getExtractor(Object object) { - Class<?> clazz = object.getClass(); - Extractor ext = class2extractor.get(clazz.getName()); - - if (ext == null) { - // allocate a new extractor, if another thread doesn't beat us to it - ext = class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz)); - } - - return ext; - } - - /** - * Builds an extractor for the class. - * - * @param clazz class for which the extractor should be built - * - * @return a new extractor - */ - private Extractor buildExtractor(Class<?> clazz) { - String value = properties.getProperty(prefix + clazz.getName(), null); - if (value != null) { - // property has config info for this class - build the extractor - return buildExtractor(clazz, value); - } - - /* - * Get the extractor, if any, for the super class or interfaces, but - * don't add one if it doesn't exist - */ - Extractor ext = getClassExtractor(clazz, false); - if (ext != null) { - return ext; - } - - /* - * No extractor defined for for this class or its super class - we - * cannot extract data items from objects of this type, so just - * allocated a null extractor. - */ - logger.warn("missing property {}{}", prefix, clazz.getName()); - return new NullExtractor(); - } - - /** - * Builds an extractor for the class, based on the config value extracted - * from the corresponding property. - * - * @param clazz class for which the extractor should be built - * @param value config value (e.g., "${event.request.id}" - * @return a new extractor - */ - private Extractor buildExtractor(Class<?> clazz, String value) { - if (!value.startsWith("${")) { - logger.warn("property value for {}{} does not start with {}", prefix, clazz.getName(), "'${'"); - return new NullExtractor(); - } - - if (!value.endsWith("}")) { - logger.warn("property value for {}{} does not end with '}'", prefix, clazz.getName()); - return new NullExtractor(); - } - - // get the part in the middle - String val = value.substring(2, value.length() - 1); - if (val.startsWith(".")) { - logger.warn("property value for {}{} begins with '.'", prefix, clazz.getName()); - return new NullExtractor(); - } - - if (val.endsWith(".")) { - logger.warn("property value for {}{} ends with '.'", prefix, clazz.getName()); - return new NullExtractor(); - } - - // everything's valid - create the extractor - try { - ComponetizedExtractor ext = new ComponetizedExtractor(clazz, val.split("[.]")); - - /* - * If there's only one extractor, then just return it, otherwise - * return the whole extractor. - */ - return (ext.extractors.length == 1 ? ext.extractors[0] : ext); - - } catch (ExtractorException e) { - logger.warn("cannot build extractor for {}", clazz.getName(), e); - return new NullExtractor(); - } - } - - /** - * Gets the extractor for a class, examining all super classes and - * interfaces. - * - * @param clazz class whose extractor is desired - * @param addOk {@code true} if the extractor may be added, provided the - * property is defined, {@code false} otherwise - * @return the extractor to be used for the class, or {@code null} if no - * extractor has been defined yet - */ - private Extractor getClassExtractor(Class<?> clazz, boolean addOk) { - if (clazz == null) { - return null; - } - - Extractor ext = null; - - if (addOk) { - String val = properties.getProperty(prefix + clazz.getName(), null); - - if (val != null) { - /* - * A property is defined for this class, so create the extractor - * for it. - */ - return class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz)); - } - } - - // see if the superclass has an extractor - if ((ext = getClassExtractor(clazz.getSuperclass(), true)) != null) { - return ext; - } - - // check the interfaces, too - for (Class<?> clz : clazz.getInterfaces()) { - if ((ext = getClassExtractor(clz, true)) != null) { - break; - } - } - - return ext; - } - - /** - * Extractor that always returns {@code null}. Used when no extractor could - * be built for a given object type. - */ - private class NullExtractor implements Extractor { - - @Override - public Object extract(Object object) { - logger.info("cannot extract {} from {}", type, object.getClass()); - return null; - } - } - - /** - * Component-ized extractor. Extracts an object that is referenced - * hierarchically, where each name identifies a particular component within - * the hierarchy. Supports retrieval from {@link Map} objects, as well as - * via getXxx() methods, or by direct field retrieval. - * - * <p>Note: this will <i>not</i> work if POJOs are contained within a Map. - */ - private class ComponetizedExtractor implements Extractor { - - /** - * Extractor for each component. - */ - private final Extractor[] extractors; - - /** - * Constructor. - * - * @param clazz the class associated with the object at the root of the - * hierarchy - * @param names name associated with each component - * @throws ExtractorException extractor exception - */ - public ComponetizedExtractor(Class<?> clazz, String[] names) throws ExtractorException { - this.extractors = new Extractor[names.length]; - - Class<?> clz = clazz; - - for (int x = 0; x < names.length; ++x) { - String comp = names[x]; - - Pair<Extractor, Class<?>> pair = buildExtractor(clz, comp); - - extractors[x] = pair.getLeft(); - clz = pair.getRight(); - } - } - - /** - * Builds an extractor for the given component of an object. - * - * @param clazz type of object from which the component will be - * extracted - * @param comp name of the component to extract - * @return a pair containing the extractor and the extracted object's - * type - * @throws ExtractorException extrator exception - */ - private Pair<Extractor, Class<?>> buildExtractor(Class<?> clazz, String comp) throws ExtractorException { - - Pair<Extractor, Class<?>> pair = getMethodExtractor(clazz, comp); - - if (pair == null) { - pair = getFieldExtractor(clazz, comp); - } - - if (pair == null) { - pair = getMapExtractor(clazz, comp); - } - - - // didn't find an extractor - if (pair == null) { - throw new ExtractorException("class " + clazz + " contains no element " + comp); - } - - return pair; - } - - @Override - public Object extract(Object object) { - Object obj = object; - - for (Extractor ext : extractors) { - if (obj == null) { - break; - } - - obj = ext.extract(obj); - } - - return obj; - } - - /** - * Gets an extractor that invokes a getXxx() method to retrieve the - * object. - * - * @param clazz container's class - * @param name name of the property to be retrieved - * @return a new extractor, or {@code null} if the class does not - * contain the corresponding getXxx() method - * @throws ExtractorException if the getXxx() method is inaccessible - */ - private Pair<Extractor, Class<?>> getMethodExtractor(Class<?> clazz, String name) throws ExtractorException { - Method meth; - - String nm = "get" + StringUtils.capitalize(name); - - try { - meth = clazz.getMethod(nm); - - Class<?> retType = meth.getReturnType(); - if (retType == void.class) { - // it's a void method, thus it won't return an object - return null; - } - - return Pair.of(new MethodExtractor(meth), retType); - - } catch (NoSuchMethodException expected) { - // no getXxx() method, maybe there's a field by this name - logger.debug("no method {} in {}", nm, clazz.getName(), expected); - return null; - - } catch (SecurityException e) { - throw new ExtractorException("inaccessible method " + clazz + "." + nm, e); - } - } - - /** - * Gets an extractor for a field within the object. - * - * @param clazz container's class - * @param name name of the field whose value is to be extracted - * @return a new extractor, or {@code null} if the class does not - * contain the given field - * @throws ExtractorException if the field is inaccessible - */ - private Pair<Extractor, Class<?>> getFieldExtractor(Class<?> clazz, String name) throws ExtractorException { - - Field field = getClassField(clazz, name); - if (field == null) { - return null; - } - - return Pair.of(new FieldExtractor(field), field.getType()); - } - - /** - * Gets an extractor for an item within a Map object. - * - * @param clazz container's class - * @param key item key within the map - * @return a new extractor, or {@code null} if the class is not a Map - * subclass - */ - private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) { - - if (!Map.class.isAssignableFrom(clazz)) { - return null; - } - - /* - * Don't know the value's actual type, so we'll assume it's a Map - * for now. Things should still work OK, as this is only used to - * direct the constructor on what type of extractor to create next. - * If the object turns out not to be a map, then the MapExtractor - * for the next component will just return null. - */ - return Pair.of(new MapExtractor(key), Map.class); - } - - /** - * Gets field within a class, examining all super classes and - * interfaces. - * - * @param clazz class whose field is desired - * @param name name of the desired field - * @return the field within the class, or {@code null} if the field does - * not exist - * @throws ExtractorException if the field is inaccessible - */ - private Field getClassField(Class<?> clazz, String name) throws ExtractorException { - if (clazz == null) { - return null; - } - - try { - return clazz.getField(name); - - } catch (NoSuchFieldException expected) { - // no field by this name - try super class & interfaces - logger.debug("no field {} in {}", name, clazz.getName(), expected); - return null; - - } catch (SecurityException e) { - throw new ExtractorException("inaccessible field " + clazz + "." + name, e); - } - } - } -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java deleted file mode 100644 index 5e02d602..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling.extractor; - -/** - * Used to extract an object contained within another object. - */ -@FunctionalInterface -public interface Extractor { - - /** - * Extracts an object contained within another object. - * - * @param object object from which to extract the contained object - * @return the extracted value, or {@code null} if it cannot be extracted - */ - Object extract(Object object); -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java deleted file mode 100644 index a864672b..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling.extractor; - -/** - * Exception generated by extractors. - */ -public class ExtractorException extends Exception { - private static final long serialVersionUID = 1L; - - public ExtractorException() { - super(); - } - - public ExtractorException(String message) { - super(message); - } - - public ExtractorException(Throwable cause) { - super(cause); - } - - public ExtractorException(String message, Throwable cause) { - super(message, cause); - } - - public ExtractorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } - -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java deleted file mode 100644 index 9389ab22..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling.extractor; - -import java.lang.reflect.Field; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Used to extract an object stored in one of the container's fields. - */ -public class FieldExtractor implements Extractor { - - private static final Logger logger = LoggerFactory.getLogger(FieldExtractor.class); - - /** - * Field containing the object. - */ - private final Field field; - - /** - * Constructor. - * - * @param field field containing the object - */ - public FieldExtractor(Field field) { - this.field = field; - } - - @Override - public Object extract(Object object) { - try { - return field.get(object); - - } catch (IllegalAccessException | IllegalArgumentException e) { - logger.warn("cannot get {} from {}", field.getName(), object.getClass(), e); - return null; - } - } -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java deleted file mode 100644 index 9c5be5ff..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling.extractor; - -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Used to extract an object stored in a map. - */ -public class MapExtractor implements Extractor { - - private static final Logger logger = LoggerFactory.getLogger(MapExtractor.class); - - /** - * Key to the item to extract from the map. - */ - private final String key; - - /** - * Constructor. - * - * @param key key to the item to extract from the map - */ - public MapExtractor(String key) { - this.key = key; - } - - @Override - public Object extract(Object object) { - - if (object instanceof Map) { - Map<?, ?> map = (Map<?, ?>) object; - - return map.get(key); - - } else { - logger.warn("expecting a map, instead of {}", object.getClass()); - return null; - } - } -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java deleted file mode 100644 index 3efef5ec..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling.extractor; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Used to extract an object by invoking a method on the container. - */ -public class MethodExtractor implements Extractor { - - private static final Logger logger = LoggerFactory.getLogger(MethodExtractor.class); - - /** - * Method to invoke to extract the contained object. - */ - private final Method method; - - /** - * Constructor. - * - * @param method method to invoke to extract the contained object - */ - public MethodExtractor(Method method) { - this.method = method; - } - - @Override - public Object extract(Object object) { - try { - return method.invoke(object); - - } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - logger.warn("cannot invoke {} on {}", method.getName(), object.getClass(), e); - return null; - } - } -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java index 6be080f7..c78fb17b 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -63,7 +63,7 @@ public class BucketAssignments { /** * Constructor. - * + * * @param hostArray maps a bucket number (i.e., array index) to a host. All values * must be non-null */ @@ -81,7 +81,7 @@ public class BucketAssignments { /** * Gets the leader, which is the host with the minimum UUID. - * + * * @return the assignment leader */ public String getLeader() { @@ -103,7 +103,7 @@ public class BucketAssignments { /** * Determines if a host has an assignment. - * + * * @param host host to be checked * @return {@code true} if the host has an assignment, {@code false} otherwise */ @@ -123,7 +123,7 @@ public class BucketAssignments { /** * Gets all of the hosts that have an assignment. - * + * * @return all of the hosts that have an assignment */ public Set<String> getAllHosts() { @@ -143,7 +143,7 @@ public class BucketAssignments { /** * Gets the host assigned to a given bucket. - * + * * @param hashCode hash code of the item whose assignment is desired * @return the assigned host, or {@code null} if the item has no assigned host */ @@ -153,12 +153,12 @@ public class BucketAssignments { return null; } - return hostArray[(hashCode & MAX_BUCKETS_MASK) % hostArray.length]; + return hostArray[(Math.abs(hashCode) & MAX_BUCKETS_MASK) % hostArray.length]; } /** * Gets the number of buckets. - * + * * @return the number of buckets */ public int size() { @@ -168,7 +168,7 @@ public class BucketAssignments { /** * Checks the validity of the assignments, verifying that all buckets have been * assigned to a host. - * + * * @throws PoolingFeatureException if the assignments are invalid */ public void checkValidity() throws PoolingFeatureException { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java deleted file mode 100644 index 3c34e971..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling.message; - -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.drools.pooling.PoolingFeatureException; - -/** - * Message to forward an event to another host. - */ -public class Forward extends Message { - - /** - * Number of hops (i.e., number of times it's been forwarded) so far. - */ - private int numHops; - - /** - * Time, in milliseconds, at which the message was created. - */ - private long createTimeMs; - - /** - * Protocol of the receiving topic. - */ - private CommInfrastructure protocol; - - /** - * Topic on which the event was received. - */ - private String topic; - - /** - * The event pay load that was received on the topic. - */ - private String payload; - - /** - * The request id that was extracted from the event. - */ - private String requestId; - - /** - * Constructor. - */ - public Forward() { - super(); - } - - /** - * Constructor. - * - * @param source host on which the message originated - * @param protocol protocol - * @param topic topic - * @param payload the actual event data received on the topic - * @param requestId request id - */ - public Forward(String source, CommInfrastructure protocol, String topic, String payload, String requestId) { - super(source); - - this.numHops = 0; - this.createTimeMs = System.currentTimeMillis(); - this.protocol = protocol; - this.topic = topic; - this.payload = payload; - this.requestId = requestId; - } - - /** - * Increments {@link #numHops}. - */ - public void bumpNumHops() { - ++numHops; - } - - public int getNumHops() { - return numHops; - } - - public void setNumHops(int numHops) { - this.numHops = numHops; - } - - public long getCreateTimeMs() { - return createTimeMs; - } - - public void setCreateTimeMs(long createTimeMs) { - this.createTimeMs = createTimeMs; - } - - public CommInfrastructure getProtocol() { - return protocol; - } - - public void setProtocol(CommInfrastructure protocol) { - this.protocol = protocol; - } - - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } - - public String getPayload() { - return payload; - } - - public void setPayload(String payload) { - this.payload = payload; - } - - public String getRequestId() { - return requestId; - } - - public void setRequestId(String requestId) { - this.requestId = requestId; - } - - public boolean isExpired(long minCreateTimeMs) { - return (createTimeMs < minCreateTimeMs); - - } - - @Override - public void checkValidity() throws PoolingFeatureException { - - super.checkValidity(); - - if (protocol == null) { - throw new PoolingFeatureException("missing message protocol"); - } - - if (topic == null || topic.isEmpty()) { - throw new PoolingFeatureException("missing message topic"); - } - - /* - * Note: an empty pay load is OK, as an empty message could have been - * received on the topic. - */ - - if (payload == null) { - throw new PoolingFeatureException("missing message payload"); - } - - if (requestId == null || requestId.isEmpty()) { - throw new PoolingFeatureException("missing message requestId"); - } - - if (numHops < 0) { - throw new PoolingFeatureException("invalid message hop count"); - } - } - -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java deleted file mode 100644 index 5b7012d2..00000000 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.pooling.state; - -import java.util.Map; -import java.util.TreeMap; - -/** - * Filter Utilities. These methods create <i>TreeMap</i> objects, because they - * should only contain a small number of items. - */ -public class FilterUtils { - // message element names - public static final String MSG_CHANNEL = "channel"; - public static final String MSG_TIMESTAMP = "timestampMs"; - - // json element names - protected static final String JSON_CLASS = "class"; - protected static final String JSON_FILTERS = "filters"; - protected static final String JSON_FIELD = "field"; - protected static final String JSON_VALUE = "value"; - - // values to be stuck into the "class" element - protected static final String CLASS_OR = "Or"; - protected static final String CLASS_AND = "And"; - protected static final String CLASS_EQUALS = "Equals"; - - /** - * Constructor. - */ - private FilterUtils() { - super(); - } - - /** - * Makes a filter that verifies that a field equals a value. - * - * @param field name of the field to check - * @param value desired value - * @return a map representing an "equals" filter - */ - public static Map<String, Object> makeEquals(String field, String value) { - Map<String, Object> map = new TreeMap<>(); - map.put(JSON_CLASS, CLASS_EQUALS); - map.put(JSON_FIELD, field); - map.put(JSON_VALUE, value); - - return map; - } - - /** - * Makes an "and" filter, where all of the items must be true. - * - * @param items items to be checked - * @return an "and" filter - */ - public static Map<String, Object> makeAnd(@SuppressWarnings("unchecked") Map<String, Object>... items) { - Map<String, Object> map = new TreeMap<>(); - map.put(JSON_CLASS, CLASS_AND); - map.put(JSON_FILTERS, items); - - return map; - } - - /** - * Makes an "or" filter, where at least one of the items must be true. - * - * @param items items to be checked - * @return an "or" filter - */ - public static Map<String, Object> makeOr(@SuppressWarnings("unchecked") Map<String, Object>... items) { - Map<String, Object> map = new TreeMap<>(); - map.put(JSON_CLASS, CLASS_OR); - map.put(JSON_FILTERS, items); - - return map; - } -} diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java index 59d264ec..c5761bfd 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,16 +20,8 @@ package org.onap.policy.drools.pooling.state; -import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL; -import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_TIMESTAMP; -import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd; -import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals; -import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr; - -import java.util.Map; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.message.Heartbeat; -import org.onap.policy.drools.pooling.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +41,7 @@ public class StartState extends State { /** * Constructor. - * + * * @param mgr pooling manager */ public StartState(PoolingManager mgr) { @@ -58,7 +50,7 @@ public class StartState extends State { /** * Get Heart beat time stamp in milliseconds. - * + * * @return the time stamp inserted into the heart beat message */ public long getHbTimestampMs() { @@ -110,14 +102,4 @@ public class StartState extends State { return null; } - - @SuppressWarnings("unchecked") - @Override - public Map<String, Object> getFilter() { - // ignore everything except our most recent heart beat message - return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeAnd(makeEquals(MSG_CHANNEL, getHost()), - makeEquals(MSG_TIMESTAMP, String.valueOf(hbTimestampMs)))); - - } - } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index f2277be3..853c24d4 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -20,22 +20,15 @@ package org.onap.policy.drools.pooling.state; -import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL; -import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals; -import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr; - import java.util.LinkedList; import java.util.List; -import java.util.Map; import org.onap.policy.drools.pooling.CancellableScheduledTask; import org.onap.policy.drools.pooling.PoolingManager; import org.onap.policy.drools.pooling.PoolingProperties; import org.onap.policy.drools.pooling.message.BucketAssignments; -import org.onap.policy.drools.pooling.message.Forward; import org.onap.policy.drools.pooling.message.Heartbeat; import org.onap.policy.drools.pooling.message.Identification; import org.onap.policy.drools.pooling.message.Leader; -import org.onap.policy.drools.pooling.message.Message; import org.onap.policy.drools.pooling.message.Offline; import org.onap.policy.drools.pooling.message.Query; import org.slf4j.Logger; @@ -71,18 +64,6 @@ public abstract class State { } /** - * Gets the server-side filter to use when polling the DMaaP internal topic. The - * default method returns a filter that accepts messages on the admin channel and on - * the host's own channel. - * - * @return the server-side filter to use. - */ - @SuppressWarnings("unchecked") - public Map<String, Object> getFilter() { - return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost())); - } - - /** * Cancels the timers added by this state. */ public final void cancelTimers() { @@ -142,26 +123,6 @@ public abstract class State { } /** - * Processes a message. The default method passes it to the manager to handle and - * returns {@code null}. - * - * @param msg message to be processed - * @return the new state, or {@code null} if the state is unchanged - */ - public State process(Forward msg) { - if (getHost().equals(msg.getChannel())) { - logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic()); - mgr.handle(msg); - - } else { - logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(), - getTopic()); - } - - return null; - } - - /** * Processes a message. The default method just returns {@code null}. * * @param msg message to be processed @@ -293,16 +254,6 @@ public abstract class State { * @param channel channel * @param msg message to be published */ - protected final void publish(String channel, Forward msg) { - mgr.publish(channel, msg); - } - - /** - * Publishes a message on the specified channel. - * - * @param channel channel - * @param msg message to be published - */ protected final void publish(String channel, Heartbeat msg) { mgr.publish(channel, msg); } |