diff options
Diffstat (limited to 'appc-oam/appc-oam-bundle/src/main/java/org')
-rw-r--r-- | appc-oam/appc-oam-bundle/src/main/java/org/onap/appc/oam/messageadapter/MessageAdapter.java | 101 |
1 files changed, 17 insertions, 84 deletions
diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/onap/appc/oam/messageadapter/MessageAdapter.java b/appc-oam/appc-oam-bundle/src/main/java/org/onap/appc/oam/messageadapter/MessageAdapter.java index 7112b7d99..f3160511f 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/onap/appc/oam/messageadapter/MessageAdapter.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/onap/appc/oam/messageadapter/MessageAdapter.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP : APPC * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Copyright (C) 2017 Amdocs * ============================================================================= @@ -27,118 +27,51 @@ import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.commons.lang.ObjectUtils; -import org.onap.appc.adapter.message.MessageAdapterFactory; -import org.onap.appc.adapter.message.Producer; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; -import org.osgi.framework.BundleContext; -import org.osgi.framework.FrameworkUtil; -import org.osgi.framework.ServiceReference; - -import java.util.HashSet; -import java.util.Properties; +import org.onap.appc.srvcomm.messaging.MessagingConnector; public class MessageAdapter { private final EELFLogger logger = EELFManager.getInstance().getLogger(MessageAdapter.class); - private final String PROP_APPC_OAM_DISABLED = "appc.OAM.disabled"; - private final String PROP_APPC_OAM_TOPIC_WRITE = "appc.OAM.topic.write"; - private String PROP_APPC_OAM_CLIENT_KEY = "appc.OAM.client.key"; - private String PROP_APPC_OAM_CLIENT_SECRET = "appc.OAM.client.secret"; - private String PROP_APPC_OAM_POOLMEMBERS = "appc.OAM.poolMembers"; + private static final String PROPERTIES_PREFIX = "appc.OAM"; - private Producer producer; + private MessagingConnector messagingConnector; private String partition; - private Configuration configuration; - private HashSet<String> pool; - private String writeTopic; - private String apiKey; - private String apiSecret; private boolean isDisabled; /** * Initialize producer client to post messages using configuration properties. */ public void init() { - configuration = ConfigurationFactory.getConfiguration(); - Properties properties = configuration.getProperties(); - updateProperties(properties); if (isAppcOamPropsListenerEnabled()) { - createProducer(); + messagingConnector = new MessagingConnector(); } else { logger.warn(String.format("The listener %s is disabled and will not be run", "appc.OAM")); } } + + public void init(MessagingConnector connector) { - /** - * Create producer using MessageAdapterFactory which is found through bundle context. - */ - void createProducer() { - BundleContext ctx = FrameworkUtil.getBundle(MessageAdapter.class).getBundleContext(); - if (ctx == null) { - logger.warn("MessageAdapter cannot create producer due to no bundle context."); - return; - } - - ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName()); - if (svcRef == null) { - logger.warn("MessageAdapter cannot create producer due to no MessageAdapterFactory service reference."); - return; - } - - Producer localProducer = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopic, - apiKey, apiSecret); - - for (String url : pool) { - if (url.contains("3905") || url.contains("https")) { - localProducer.useHttps(true); - break; - } + if (isAppcOamPropsListenerEnabled()) { + messagingConnector = connector; + } else { + logger.warn(String.format("The listener %s is disabled and will not be run", "appc.OAM")); } - - producer = localProducer; - - logger.debug("MessageAdapter created producer."); } - /** - * Read property value to set writeTopic, apiKey, apiSecret and pool. - * - * @param props of configuration - */ - private void updateProperties(Properties props) { - logger.trace("Entering to updateProperties with Properties = " + ObjectUtils.toString(props)); - - pool = new HashSet<>(); - if (props != null) { - isDisabled = Boolean.parseBoolean(props.getProperty(PROP_APPC_OAM_DISABLED)); - writeTopic = props.getProperty(PROP_APPC_OAM_TOPIC_WRITE); - apiKey = props.getProperty(PROP_APPC_OAM_CLIENT_KEY); - apiSecret = props.getProperty(PROP_APPC_OAM_CLIENT_SECRET); - String hostnames = props.getProperty(PROP_APPC_OAM_POOLMEMBERS); - if (hostnames != null && !hostnames.isEmpty()) { - for (String name : hostnames.split(",")) { - pool.add(name); - } - } - } - } /** * Get producer. If it is null, call createProducer to create it again. * * @return Producer */ - Producer getProducer() { - if (producer == null) { - // In case, producer was not properly set yet, set it again. - logger.info("Calling createProducer as producer is null."); - createProducer(); + MessagingConnector getMessagingConnector() { + if (messagingConnector == null) { + messagingConnector = new MessagingConnector(); } - return producer; + return messagingConnector; } /** @@ -161,8 +94,8 @@ public class MessageAdapter { logger.debug("UEB Response = " + jsonMessage); } - Producer myProducer = getProducer(); - success = myProducer != null && myProducer.post(this.partition, jsonMessage); + MessagingConnector connector = getMessagingConnector(); + success = connector != null && connector.publishMessage(PROPERTIES_PREFIX, this.partition, jsonMessage); } catch (JsonProcessingException e1) { logger.error("Error generating Json from UEB message " + e1.getMessage()); success = false; |