diff options
author | Jorge Hernandez <jh1730@att.com> | 2017-09-13 14:48:05 -0500 |
---|---|---|
committer | Jorge Hernandez <jh1730@att.com> | 2017-09-14 01:18:09 -0500 |
commit | 1f2fff6c740695cb2ae430af16684ebc13bc57d5 (patch) | |
tree | de7f031d4efb4b9cd1216acbb77a5e54319a12a8 /policy-endpoints/src | |
parent | 009582f2e2a4bd3b09da3721a8b6587d5ac89723 (diff) |
added "fetch timeout" backoff for UEB endpoints
this prevents high frequency fruitless attempts to connect to an
unreachable (perhaps temporarily) UEB server.
added additional management apis for noop endpoints.
bump versions of jackson parsers so through snakeyaml library
throughtransitive dependencies is bump up from 1.15 as it is
resolved in the classpath at runtime to > 1.17
otherwise causes incompatibilities with the one used in
controlloops under their own classloaders, as they expect 1.17.
Change-Id: I936348c4b93a2c409c22568868c44ed330dc18f7
Issue-ID: POLICY-119
Signed-off-by: Jorge Hernandez <jh1730@att.com>
Diffstat (limited to 'policy-endpoints/src')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java | 827 |
1 files changed, 416 insertions, 411 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java index edb03bba..c4155b77 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,427 +34,432 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; import com.att.nsa.cambria.client.CambriaConsumer; import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.impl.MRConsumerImpl; import com.att.nsa.mr.client.response.MRConsumerResponse; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; /** * Wrapper around libraries to consume from message bus * */ public interface BusConsumer { - - /** - * fetch messages - * - * @return list of messages - * @throws Exception when error encountered by underlying libraries - */ - public Iterable<String> fetch() throws InterruptedException, IOException; - - /** - * close underlying library consumer - */ - public void close(); - - /** - * Cambria based consumer - */ - public static class CambriaConsumerWrapper implements BusConsumer { - - /** - * Cambria client - */ - protected CambriaConsumer consumer; - - /** - * Cambria Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws GeneralSecurityException - * @throws MalformedURLException - */ - public CambriaConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) - throws IllegalArgumentException { - - ConsumerBuilder builder = - new CambriaClientBuilders.ConsumerBuilder(); - - - if (useHttps){ - - if(useSelfSignedCerts){ - builder.knownAs(consumerGroup, consumerInstance) - .usingHosts(servers) - .onTopic(topic) - .waitAtServer(fetchTimeout) - .receivingAtMost(fetchLimit) - .usingHttps() - .allowSelfSignedCertificates(); - } - else{ - builder.knownAs(consumerGroup, consumerInstance) - .usingHosts(servers) - .onTopic(topic) - .waitAtServer(fetchTimeout) - .receivingAtMost(fetchLimit) - .usingHttps(); - } - } - else{ - builder.knownAs(consumerGroup, consumerInstance) - .usingHosts(servers) - .onTopic(topic) - .waitAtServer(fetchTimeout) - .receivingAtMost(fetchLimit); - } - - if (apiKey != null && !apiKey.isEmpty() && - apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); - } - - try { - this.consumer = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Iterable<String> fetch() throws IOException { - return this.consumer.fetch(); - } - - @Override - public void close() { - this.consumer.close(); - } - - @Override - public String toString() { - return "CambriaConsumerWrapper []"; - } - } - - /** - * MR based consumer - */ - public abstract class DmaapConsumerWrapper implements BusConsumer { - - /** - * logger - */ - private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); - - /** - * fetch timeout - */ - protected int fetchTimeout; - - /** - * close condition - */ - protected Object closeCondition = new Object(); - - /** - * MR Consumer - */ - protected MRConsumerImpl consumer; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param username AAF Login - * @param password AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws MalformedURLException - */ - public DmaapConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String username, String password, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps) - throws MalformedURLException { - - this.fetchTimeout = fetchTimeout; - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("No topic for DMaaP"); - } - - this.consumer = new MRConsumerImpl(servers, topic, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, - null, apiKey, apiSecret); - - this.consumer.setUsername(username); - this.consumer.setPassword(password); - } - - @Override - public Iterable<String> fetch() throws InterruptedException, IOException { - MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); - if (response == null) { - logger.warn("{}: DMaaP NULL response received", this); - - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } - return new ArrayList<>(); - } else { - logger.debug("DMaaP consumer received {} : {}" + - response.getResponseCode(), - response.getResponseMessage()); - - if (response.getResponseCode() == null || - !response.getResponseCode().equals("200")) { - - logger.error("DMaaP consumer received: {} : {}", - response.getResponseCode(), - response.getResponseMessage()); - - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } - - /* fall through */ - } - } - - if (response.getActualMessages() == null) - return new ArrayList<>(); - else - return response.getActualMessages(); - } - - @Override - public void close() { - synchronized (closeCondition) { - closeCondition.notifyAll(); - } - - this.consumer.close(); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder. - append("DmaapConsumerWrapper ["). - append("consumer.getAuthDate()=").append(consumer.getAuthDate()). - append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). - append(", consumer.getHost()=").append(consumer.getHost()). - append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). - append(", consumer.getUsername()=").append(consumer.getUsername()). - append("]"); - return builder.toString(); - } - } - - /** - * MR based consumer - */ - public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); - - private Properties props; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws MalformedURLException - */ - public DmaapAafConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String aafLogin, String aafPassword, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException { - - super(servers, topic, apiKey, apiSecret, - aafLogin, aafPassword, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, useHttps); - - // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if ((servers.size() == 1 && servers.get(0).equals("")) || - (servers == null) || (servers.isEmpty())) { - throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); - } - - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - props = new Properties(); - - if(useHttps){ - props.setProperty("Protocol", "https"); - this.consumer.setHost(servers.get(0) + ":3905"); - - } - else{ - props.setProperty("Protocol", "http"); - this.consumer.setHost(servers.get(0) + ":3904"); - } - - this.consumer.setProps(props); - logger.info("{}: CREATION", this); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - MRConsumerImpl consumer = (MRConsumerImpl) this.consumer; - - builder. - append("DmaapConsumerWrapper ["). - append("consumer.getAuthDate()=").append(consumer.getAuthDate()). - append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). - append(", consumer.getHost()=").append(consumer.getHost()). - append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). - append(", consumer.getUsername()=").append(consumer.getUsername()). - append("]"); - return builder.toString(); - } - } - - public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); - - private Properties props; - - public DmaapDmeConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String dme2Login, String dme2Password, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, - String environment, String aftEnvironment, String dme2Partner, - String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) throws MalformedURLException { - - - - super(servers, topic, apiKey, apiSecret, - dme2Login, dme2Password, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, useHttps); - - - String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - - if (environment == null || environment.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); - } if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); - } if (latitude == null || latitude.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP"); - } if (longitude == null || longitude.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP"); - } - - if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { - throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + - PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } - - String serviceName = servers.get(0); - - this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - this.consumer.setUsername(dme2Login); - this.consumer.setPassword(dme2Password); - - props = new Properties(); - - props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - - props.setProperty("username", dme2Login); - props.setProperty("password", dme2Password); - - /* These are required, no defaults */ - props.setProperty("topic", topic); - - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); - - if (dme2Partner != null) - props.setProperty("Partner", dme2Partner); - if (dme2RouteOffer != null) - props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); - - /* These are optional, will default to these values if not set in additionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "GET"); - - if(useHttps){ - props.setProperty("Protocol", "https"); - - } - else{ - props.setProperty("Protocol", "http"); - } - - props.setProperty("contenttype", "application/json"); - - if (additionalProps != null) { - for(String key : additionalProps.keySet()) - props.put(key, additionalProps.get(key)); - } - - MRClientFactory.prop = props; - this.consumer.setProps(props); - - logger.info("{}: CREATION", this); - } - } -} + /** + * fetch messages + * + * @return list of messages + * @throws Exception when error encountered by underlying libraries + */ + public Iterable<String> fetch() throws InterruptedException, IOException; + + /** + * close underlying library consumer + */ + public void close(); + + /** + * Cambria based consumer + */ + public static class CambriaConsumerWrapper implements BusConsumer { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); + + /** + * Cambria client + */ + protected CambriaConsumer consumer; + + /** + * fetch timeout + */ + protected int fetchTimeout; + + /** + * close condition + */ + protected Object closeCondition = new Object(); + + /** + * Cambria Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws GeneralSecurityException + * @throws MalformedURLException + */ + public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) + throws IllegalArgumentException { + + this.fetchTimeout = fetchTimeout; + + final ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder(); + + if (useHttps) { + + if (useSelfSignedCerts) { + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps() + .allowSelfSignedCertificates(); + } else { + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps(); + } + } else { + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); + } + + if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { + builder.authenticatedBy(apiKey, apiSecret); + } + + try { + this.consumer = builder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Iterable<String> fetch() throws IOException, InterruptedException { + try { + return this.consumer.fetch(); + } catch (final IOException e) { + logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), + this.fetchTimeout); + synchronized (this.closeCondition) { + this.closeCondition.wait(this.fetchTimeout); + } + + throw e; + } + } + + @Override + public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + + this.consumer.close(); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("CambriaConsumerWrapper [fetchTimeout=").append(fetchTimeout).append("]"); + return builder.toString(); + } + } + + /** + * MR based consumer + */ + public abstract class DmaapConsumerWrapper implements BusConsumer { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); + + /** + * fetch timeout + */ + protected int fetchTimeout; + + /** + * close condition + */ + protected Object closeCondition = new Object(); + + /** + * MR Consumer + */ + protected MRConsumerImpl consumer; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param username AAF Login + * @param password AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, + String username, String password, String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException { + + this.fetchTimeout = fetchTimeout; + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("No topic for DMaaP"); + } + + this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, null, apiKey, apiSecret); + + this.consumer.setUsername(username); + this.consumer.setPassword(password); + } + + @Override + public Iterable<String> fetch() throws InterruptedException, IOException { + final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); + if (response == null) { + logger.warn("{}: DMaaP NULL response received", this); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + return new ArrayList<>(); + } else { + logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), + response.getResponseMessage()); + + if (response.getResponseCode() == null || !response.getResponseCode().equals("200")) { + + logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), + response.getResponseMessage()); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + + /* fall through */ + } + } + + if (response.getActualMessages() == null) + return new ArrayList<>(); + else + return response.getActualMessages(); + } + + @Override + public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + + this.consumer.close(); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=") + .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=") + .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost()) + .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()) + .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]"); + return builder.toString(); + } + } + /** + * MR based consumer + */ + public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { + + private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); + + private final Properties props; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param aafLogin AAF Login + * @param aafPassword AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String aafLogin, String aafPassword, String consumerGroup, + String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps) + throws MalformedURLException { + + super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, + consumerInstance, fetchTimeout, fetchLimit, useHttps); + + // super constructor sets servers = {""} if empty to avoid errors when using DME2 + if ((servers.size() == 1 && servers.get(0).equals("")) || (servers == null) + || (servers.isEmpty())) { + throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + } + + this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + props = new Properties(); + + if (useHttps) { + props.setProperty("Protocol", "https"); + this.consumer.setHost(servers.get(0) + ":3905"); + + } else { + props.setProperty("Protocol", "http"); + this.consumer.setHost(servers.get(0) + ":3904"); + } + + this.consumer.setProps(props); + logger.info("{}: CREATION", this); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + final MRConsumerImpl consumer = this.consumer; + + builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=") + .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=") + .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost()) + .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()) + .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]"); + return builder.toString(); + } + } + + public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { + + private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); + + private final Properties props; + + public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String dme2Login, String dme2Password, String consumerGroup, + String consumerInstance, int fetchTimeout, int fetchLimit, String environment, + String aftEnvironment, String dme2Partner, String latitude, String longitude, + Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException { + + + + super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, + consumerInstance, fetchTimeout, fetchLimit, useHttps); + + + final String dme2RouteOffer = + additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + if (environment == null || environment.isEmpty()) { + throw new IllegalArgumentException( + "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + + " property for DME2 in DMaaP"); + } + if (aftEnvironment == null || aftEnvironment.isEmpty()) { + throw new IllegalArgumentException( + "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + + " property for DME2 in DMaaP"); + } + if (latitude == null || latitude.isEmpty()) { + throw new IllegalArgumentException("Missing " + + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP"); + } + if (longitude == null || longitude.isEmpty()) { + throw new IllegalArgumentException( + "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + + " property for DME2 in DMaaP"); + } + + if ((dme2Partner == null || dme2Partner.isEmpty()) + && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + throw new IllegalArgumentException( + "Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + + final String serviceName = servers.get(0); + + this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + + this.consumer.setUsername(dme2Login); + this.consumer.setPassword(dme2Password); + + props = new Properties(); + + props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + + props.setProperty("username", dme2Login); + props.setProperty("password", dme2Password); + + /* These are required, no defaults */ + props.setProperty("topic", topic); + + props.setProperty("Environment", environment); + props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + + if (dme2Partner != null) + props.setProperty("Partner", dme2Partner); + if (dme2RouteOffer != null) + props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + + props.setProperty("Latitude", latitude); + props.setProperty("Longitude", longitude); + + /* These are optional, will default to these values if not set in additionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + props.setProperty("MethodType", "GET"); + + if (useHttps) { + props.setProperty("Protocol", "https"); + + } else { + props.setProperty("Protocol", "http"); + } + + props.setProperty("contenttype", "application/json"); + + if (additionalProps != null) { + for (final String key : additionalProps.keySet()) + props.put(key, additionalProps.get(key)); + } + + MRClientFactory.prop = props; + this.consumer.setProps(props); + + logger.info("{}: CREATION", this); + } + } +} |