diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | 201 |
1 files changed, 99 insertions, 102 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 636dc6e3..6d34d32b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,7 +33,6 @@ import java.io.IOException; import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Properties; @@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory; /** * Wrapper around libraries to consume from message bus - * */ public interface BusConsumer { @@ -67,7 +66,7 @@ public interface BusConsumer { /** * Sets the server-side filter. - * + * * @param filter new filter value, or {@code null} * @throws IllegalArgumentException if the consumer cannot be built with the new filter */ @@ -116,53 +115,47 @@ public interface BusConsumer { /** * Cambria Consumer Wrapper + * BusTopicParam object contains the following parameters + * servers messaging bus hosts + * topic topic + * apiKey API Key + * apiSecret API Secret + * consumerGroup Consumer Group + * consumerInstance Consumer Instance + * fetchTimeout Fetch Timeout + * fetchLimit Fetch Limit * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit + * @param busTopicParams * @throws GeneralSecurityException * @throws MalformedURLException */ - public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, - boolean useSelfSignedCerts) { - this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout, - fetchLimit, useHttps, useSelfSignedCerts); - } + public CambriaConsumerWrapper(BusTopicParams busTopicParams) { - public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { - - this.fetchTimeout = fetchTimeout; + this.fetchTimeout = busTopicParams.getFetchTimeout(); this.builder = new CambriaClientBuilders.ConsumerBuilder(); - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); + builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance()) + .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) + .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) builder.withSocketTimeout(fetchTimeout + 30000); - if (useHttps) { + if (busTopicParams.isUseHttps()) { builder.usingHttps(); - if (useSelfSignedCerts) { + if (busTopicParams.isAllowSelfSignedCerts()) { builder.allowSelfSignedCertificates(); } } - if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); + if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { + builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); } - if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { - builder.authenticatedByHttp(username, password); + if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { + builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); } try { @@ -282,34 +275,36 @@ public interface BusConsumer { /** * MR Consumer Wrapper + * <p> + * servers messaging bus hosts + * topic topic + * apiKey API Key + * apiSecret API Secret + * username AAF Login + * password AAF Password + * consumerGroup Consumer Group + * consumerInstance Consumer Instance + * fetchTimeout Fetch Timeout + * fetchLimit Fetch Limit * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param username AAF Login - * @param password AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit + * @param busTopicParams contains above listed attributes * @throws MalformedURLException */ - public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit) throws MalformedURLException { + public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - this.fetchTimeout = fetchTimeout; + this.fetchTimeout = busTopicParams.getFetchTimeout(); - if (topic == null || topic.isEmpty()) { + if (busTopicParams.isTopicNullOrEmpty()) { throw new IllegalArgumentException("No topic for DMaaP"); } - this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, - fetchLimit, null, apiKey, apiSecret); + this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(), + busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(), + busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null, + busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - this.consumer.setUsername(username); - this.consumer.setPassword(password); + this.consumer.setUsername(busTopicParams.getUserName()); + this.consumer.setPassword(busTopicParams.getPassword()); } @Override @@ -374,29 +369,29 @@ public interface BusConsumer { private final Properties props; /** + * BusTopicParams contain the following parameters * MR Consumer Wrapper + * <p> + * servers messaging bus hosts + * topic topic + * apiKey API Key + * apiSecret API Secret + * aafLogin AAF Login + * aafPassword AAF Password + * consumerGroup Consumer Group + * consumerInstance Consumer Instance + * fetchTimeout Fetch Timeout + * fetchLimit Fetch Limit * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit + * @param busTopicParams contains above listed params * @throws MalformedURLException */ - public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String aafLogin, String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, boolean useHttps) throws MalformedURLException { + public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + super(busTopicParams); // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) { + if (busTopicParams.isServersNullOrEmpty()) { throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); } @@ -404,13 +399,13 @@ public interface BusConsumer { props = new Properties(); - if (useHttps) { + if (busTopicParams.isUseHttps()) { props.setProperty(PROTOCOL_PROP, "https"); - this.consumer.setHost(servers.get(0) + ":3905"); + this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905"); } else { props.setProperty(PROTOCOL_PROP, "http"); - this.consumer.setHost(servers.get(0) + ":3904"); + this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904"); } this.consumer.setProps(props); @@ -434,70 +429,72 @@ public interface BusConsumer { private final Properties props; - public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude, - String longitude, Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException { - + public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + super(busTopicParams); - final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + final String dme2RouteOffer = busTopicParams.getAdditionalProps() + .get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - if (environment == null || environment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + if (busTopicParams.isEnvironmentNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); } - if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + if (busTopicParams.isAftEnvironmentNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); } - if (latitude == null || latitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + if (busTopicParams.isLatitudeNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); } - if (longitude == null || longitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + if (busTopicParams.isLongitudeNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); } - if ((dme2Partner == null || dme2Partner.isEmpty()) + if ((busTopicParams.isPartnerNullOrEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + busTopicParams.getTopic() + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + busTopicParams.getTopic() + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); } - final String serviceName = servers.get(0); + final String serviceName = busTopicParams.getServers().get(0); this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - this.consumer.setUsername(dme2Login); - this.consumer.setPassword(dme2Password); + this.consumer.setUsername(busTopicParams.getUserName()); + this.consumer.setPassword(busTopicParams.getPassword()); props = new Properties(); props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - props.setProperty("username", dme2Login); - props.setProperty("password", dme2Password); + props.setProperty("username", busTopicParams.getUserName()); + props.setProperty("password", busTopicParams.getPassword()); /* These are required, no defaults */ - props.setProperty("topic", topic); + props.setProperty("topic", busTopicParams.getTopic()); - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + props.setProperty("Environment", busTopicParams.getEnvironment()); + props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); - if (dme2Partner != null) { - props.setProperty("Partner", dme2Partner); + if (busTopicParams.getPartner() != null) { + props.setProperty("Partner", busTopicParams.getPartner()); } if (dme2RouteOffer != null) { props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); } - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); + props.setProperty("Latitude", busTopicParams.getLatitude()); + props.setProperty("Longitude", busTopicParams.getLongitude()); /* These are optional, will default to these values if not set in additionalProps */ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); @@ -511,7 +508,7 @@ public interface BusConsumer { props.setProperty("TransportType", "DME2"); props.setProperty("MethodType", "GET"); - if (useHttps) { + if (busTopicParams.isUseHttps()) { props.setProperty(PROTOCOL_PROP, "https"); } else { @@ -520,8 +517,8 @@ public interface BusConsumer { props.setProperty("contenttype", "application/json"); - if (additionalProps != null) { - for (Map.Entry<String, String> entry : additionalProps.entrySet()) { + if (busTopicParams.isAdditionalPropsValid()) { + for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { props.put(entry.getKey(), entry.getValue()); } } |