diff options
Diffstat (limited to 'appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/MessageAdapter.java')
-rw-r--r-- | appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/MessageAdapter.java | 140 |
1 files changed, 95 insertions, 45 deletions
diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/MessageAdapter.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/MessageAdapter.java index 21f096678..d93feabe4 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/MessageAdapter.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/MessageAdapter.java @@ -32,7 +32,6 @@ import org.openecomp.appc.adapter.message.MessageAdapterFactory; import org.openecomp.appc.adapter.message.Producer; import org.openecomp.appc.configuration.Configuration; import org.openecomp.appc.configuration.ConfigurationFactory; -import org.openecomp.appc.listener.impl.EventHandlerImpl; import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; import org.osgi.framework.ServiceReference; @@ -42,56 +41,84 @@ import java.util.Properties; public class MessageAdapter { + private final EELFLogger logger = EELFManager.getInstance().getLogger(MessageAdapter.class); + + private final String PROP_APPC_OAM_DISABLED = "appc.OAM.disabled"; + private final String PROP_APPC_OAM_TOPIC_WRITE = "appc.OAM.topic.write"; + private String PROP_APPC_OAM_CLIENT_KEY = "appc.OAM.client.key"; + private String PROP_APPC_OAM_CLIENT_SECRET = "appc.OAM.client.secret"; + private String PROP_APPC_OAM_POOLMEMBERS = "appc.OAM.poolMembers"; + private Producer producer; - private String partition ; + private String partition; private Configuration configuration; private HashSet<String> pool; private String writeTopic; private String apiKey; private String apiSecret; - - private static final EELFLogger logger = EELFManager.getInstance().getLogger(MessageAdapter.class); + private boolean isDisabled; /** - * Initialize producer client to post messages using configuration properties + * Initialize producer client to post messages using configuration properties. */ - public void init(){ - this.producer = getProducer(); - } - - private Producer getProducer() { + public void init() { configuration = ConfigurationFactory.getConfiguration(); - Properties properties=configuration.getProperties(); + Properties properties = configuration.getProperties(); updateProperties(properties); - Producer localProducer = null; - + + if (isAppcOamPropsListenerEnabled()) { + createProducer(); + } else { + logger.warn(String.format("The listener %s is disabled and will not be run", "appc.OAM")); + } + } + + /** + * Create producer using MessageAdapterFactory which is found through bundle context. + */ + void createProducer() { BundleContext ctx = FrameworkUtil.getBundle(MessageAdapter.class).getBundleContext(); - if (ctx != null) { - ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName()); - if (svcRef != null) { - localProducer = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopic, apiKey, apiSecret); - for (String url : pool) { - if (url.contains("3905") || url.contains("https")) { - localProducer.useHttps(true); - break; - } - } - } + if (ctx == null) { + logger.warn("MessageAdapter cannot create producer due to no bundle context."); + return; } - return localProducer; + ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName()); + if (svcRef == null) { + logger.warn("MessageAdapter cannot create producer due to no MessageAdapterFactory service reference."); + return; + } + + Producer localProducer = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopic, + apiKey, apiSecret); + + for (String url : pool) { + if (url.contains("3905") || url.contains("https")) { + localProducer.useHttps(true); + break; + } + } + + producer = localProducer; + + logger.debug("MessageAdapter created producer."); } + /** + * Read property value to set writeTopic, apiKey, apiSecret and pool. + * + * @param props of configuration + */ private void updateProperties(Properties props) { - if (logger.isTraceEnabled()) { - logger.trace("Entering to updateProperties with Properties = "+ ObjectUtils.toString(props)); - } + logger.trace("Entering to updateProperties with Properties = " + ObjectUtils.toString(props)); + pool = new HashSet<>(); if (props != null) { - writeTopic = props.getProperty("appc.OAM.topic.write"); - apiKey = props.getProperty("appc.OAM.client.key"); - apiSecret = props.getProperty("appc.OAM.client.secret"); - String hostnames = props.getProperty("appc.OAM.poolMembers"); + isDisabled = Boolean.parseBoolean(props.getProperty(PROP_APPC_OAM_DISABLED)); + writeTopic = props.getProperty(PROP_APPC_OAM_TOPIC_WRITE); + apiKey = props.getProperty(PROP_APPC_OAM_CLIENT_KEY); + apiSecret = props.getProperty(PROP_APPC_OAM_CLIENT_SECRET); + String hostnames = props.getProperty(PROP_APPC_OAM_POOLMEMBERS); if (hostnames != null && !hostnames.isEmpty()) { for (String name : hostnames.split(",")) { pool.add(name); @@ -101,33 +128,56 @@ public class MessageAdapter { } /** - * Posts message to UEB. As UEB accepts only json messages this method first convert uebMessage to json format and post it to UEB. - * @param oamContext response data that based on it a message will be send to UEB (the format of the message that will be sent to UEB based on the action and its YANG domainmodel). - * @return True if message is postes successfully else False + * Get producer. If it is null, call createProducer to create it again. + * + * @return Producer */ - public boolean post(OAMContext oamContext){ - boolean success; + Producer getProducer() { + if (producer == null) { + // In case, producer was not properly set yet, set it again. + logger.info("Calling createProducer as producer is null."); + createProducer(); + } + + return producer; + } + + /** + * Posts message to UEB. As UEB accepts only json messages this method first convert uebMessage to json format + * and post it to UEB. + * + * @param oamContext response data that based on it a message will be send to UEB (the format of the message that + * will be sent to UEB based on the action and its YANG domainmodel). + */ + public void post(OAMContext oamContext) { if (logger.isTraceEnabled()) { logger.trace("Entering to post with AsyncResponse = " + ObjectUtils.toString(oamContext)); } + boolean success; String jsonMessage; try { jsonMessage = Converter.convAsyncResponseToUebOutgoingMessageJsonString(oamContext); if (logger.isDebugEnabled()) { logger.debug("UEB Response = " + jsonMessage); } - success = producer.post(this.partition, jsonMessage); + + Producer myProducer = getProducer(); + success = myProducer != null && myProducer.post(this.partition, jsonMessage); } catch (JsonProcessingException e1) { - logger.error("Error generating Jason from UEB message "+ e1.getMessage()); - success= false; - }catch (Exception e){ - logger.error("Error sending message to UEB "+e.getMessage()); - success= false; + logger.error("Error generating Json from UEB message " + e1.getMessage()); + success = false; + } catch (Exception e) { + logger.error("Error sending message to UEB " + e.getMessage(), e); + success = false; } + if (logger.isTraceEnabled()) { - logger.trace("Exiting from post with (success = "+ ObjectUtils.toString(success)+")"); + logger.trace("Exiting from post with (success = " + ObjectUtils.toString(success) + ")"); } - return success; + } + + private boolean isAppcOamPropsListenerEnabled() { + return !isDisabled; } } |