From 265f24eb2a14ec15f397501212cb7eb887cc1f26 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Fri, 1 Mar 2019 14:13:11 -0500 Subject: Add various listener classes Added various listener classes to support dispatch by message type and request id. The listeners are intended to form a pipeline: TopicSource => MessageTypeDispatcher => RequestIdDispatcher => TypedMessageListener Removed "PAP" from license. Changed "handler" to "listener" in most places. Simplified a test case. Verified that no error message logged on success cases. Removed println from test. Updated some comments. Change-Id: Ife265d14a6c5c8531601d9ce1343b88c1f8986a8 Issue-ID: POLICY-1444 Signed-off-by: Jim Hahn --- .../endpoints/listeners/RequestIdDispatcher.java | 150 +++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java new file mode 100644 index 00000000..9ba73c9b --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java @@ -0,0 +1,150 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.listeners; + +import com.google.common.base.Strings; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Dispatches messages to listeners based on the request id extracted from the message. A + * listener may be registered for a specific request id or for messages that have no + * request id (i.e., autonomous messages). Note: only one listener may be registered for a + * specific request id. + * + * @param type of message/POJO this handles + */ +public class RequestIdDispatcher extends ScoListener { + + private static final Logger logger = LoggerFactory.getLogger(RequestIdDispatcher.class); + + /** + * Name of the request id field, which may be hierarchical. + */ + private final String[] requestIdFieldNames; + + /** + * Listeners for autonomous messages. + */ + private final ConcurrentLinkedQueue> listeners = new ConcurrentLinkedQueue<>(); + + /** + * Listeners for specific request ids. + */ + private final ConcurrentHashMap> req2listener = new ConcurrentHashMap<>(); + + /** + * Constructs the object. + * + * @param clazz class of message this handles + * @param requestIdFieldNames name of the request id field, which may be hierarchical + */ + public RequestIdDispatcher(Class clazz, String... requestIdFieldNames) { + super(clazz); + this.requestIdFieldNames = requestIdFieldNames; + } + + /** + * Registers a listener for autonomous messages. + * + * @param listener listener to be registered + */ + public void register(TypedMessageListener listener) { + listeners.add(listener); + } + + /** + * Registers a listener for a particular request id. + * + * @param reqid request id of interest + * @param listener listener to be registered + */ + public void register(String reqid, TypedMessageListener listener) { + if (Strings.isNullOrEmpty(reqid)) { + throw new IllegalArgumentException("attempt to register a listener with an empty request id"); + } + + req2listener.put(reqid, listener); + } + + /** + * Unregisters a listener for autonomous messages. + * + * @param listener listener to be unregistered + */ + public void unregister(TypedMessageListener listener) { + listeners.remove(listener); + } + + /** + * Unregisters the listener associated with a particular request id. + * + * @param reqid request id whose listener is to be unregistered + */ + public void unregister(String reqid) { + req2listener.remove(reqid); + } + + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) { + + // extract the request id + String reqid = sco.getString(requestIdFieldNames); + + // dispatch the message + if (Strings.isNullOrEmpty(reqid)) { + // it's an autonomous message - offer it to all autonomous listeners + for (TypedMessageListener listener : listeners) { + offerToListener(infra, topic, message, listener); + } + + } else { + // it's a response to a particular request + offerToListener(infra, topic, message, req2listener.get(reqid)); + } + } + + /** + * Offers a message to a listener. + * + * @param infra infrastructure on which the message was received + * @param topic topic on which the message was received + * @param msg message that was received + * @param listener listener to which the message should be offered, or {@code null} + */ + private void offerToListener(CommInfrastructure infra, String topic, T msg, TypedMessageListener listener) { + + if (listener == null) { + return; + } + + try { + listener.onTopicEvent(infra, topic, msg); + + } catch (RuntimeException e) { + logger.warn("listener {} failed to process message: {}", listener, msg, e); + } + } +} -- cgit 1.2.3-korg