diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/openecomp')
14 files changed, 1177 insertions, 128 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java index 7a2e9711..a3c2230d 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java @@ -31,12 +31,7 @@ public interface TopicListener { * @param commType communication infrastructure type * @param topic topic name * @param event event message as a string - * - * @return boolean. True if the invoking event dispatcher should continue - * dispatching the event to subsequent listeners. False if it is requested - * to the invoking event dispatcher to stop dispatching the same event to - * other listeners of less priority. This mechanism is generally not used. */ - public boolean onTopicEvent(Topic.CommInfrastructure commType, String topic, String event); + public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event); } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java index 5b4cfd42..c3d02d14 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java @@ -24,17 +24,64 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; -import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink; import org.openecomp.policy.common.logging.flexlogger.FlexLogger; import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink; import org.openecomp.policy.drools.properties.PolicyProperties; /** * DMAAP Topic Sink Factory */ public interface DmaapTopicSinkFactory { + public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; + public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; + public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + public final String DME2_VERSION_PROPERTY = "Version"; + public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; + public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; + public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; + public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; + + /** + * Instantiates a new DMAAP Topic Sink + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName AAF user name + * @param password AAF password + * @param partitionKey Consumer Group + * @param environment DME2 environment + * @param aftEnvironment DME2 AFT environment + * @param partner DME2 Partner + * @param latitude DME2 latitude + * @param longitude DME2 longitude + * @param additionalProps additional properties to pass to DME2 + * @param managed is this sink endpoint managed? + * + * @return an DMAAP Topic Sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String partitionKey, + String environment, + String aftEnvironment, + String partner, + String latitude, + String longitude, + Map<String,String> additionalProps, + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) ; /** * Instantiates a new DMAAP Topic Sink @@ -58,7 +105,9 @@ public interface DmaapTopicSinkFactory { String userName, String password, String partitionKey, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException; /** @@ -142,7 +191,15 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { String userName, String password, String partitionKey, - boolean managed) + String environment, + String aftEnvironment, + String partner, + String latitude, + String longitude, + Map<String,String> additionalProps, + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { if (topic == null || topic.isEmpty()) { @@ -158,7 +215,45 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, password, - partitionKey); + partitionKey, + environment, aftEnvironment, + partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts); + + if (managed) + dmaapTopicWriters.put(topic, dmaapTopicSink); + return dmaapTopicSink; + } + } + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSink build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String partitionKey, + boolean managed, + boolean useHttps, boolean allowSelfSignedCerts) + throws IllegalArgumentException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized (this) { + if (dmaapTopicWriters.containsKey(topic)) { + return dmaapTopicWriters.get(topic); + } + + DmaapTopicSink dmaapTopicSink = + new InlineDmaapTopicSink(servers, topic, + apiKey, apiSecret, + userName, password, + partitionKey, useHttps, allowSelfSignedCerts); if (managed) dmaapTopicWriters.put(topic, dmaapTopicSink); @@ -172,7 +267,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { */ @Override public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException { - return this.build(servers, topic, null, null, null, null, null, true); + return this.build(servers, topic, null, null, null, null, null, true, false, false); } @@ -196,12 +291,10 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (servers == null || servers.isEmpty()) { - logger.error("No DMAAP servers provided in " + properties); - continue; - } - List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + List<String> serverList; + if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + else serverList = new ArrayList<>(); String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + @@ -223,14 +316,100 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_MANAGED_SUFFIX); + + /* DME2 Properties */ + + String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + + String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + + String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); + + String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); + + String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + + String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + + String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); + + String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); + + String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); + + String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); + + String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); + + String dme2SessionStickinessRequired = properties + .getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); + + Map<String,String> dme2AdditionalProps = new HashMap<>(); + + if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) + dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); + if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) + dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); + if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) + dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); + if (dme2Version != null && !dme2Version.isEmpty()) + dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); + if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) + dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) + dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); + if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) + dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); + + if (servers == null || servers.isEmpty()) { + logger.error("No DMaaP servers or DME2 ServiceName provided"); + continue; + } + boolean managed = true; if (managedString != null && !managedString.isEmpty()) { managed = Boolean.parseBoolean(managedString); } + String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + //default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()){ + useHttps = Boolean.parseBoolean(useHttpsString); + } + + + String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + //default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, - apiKey, apiSecret, aafMechId, aafPassword, - partitionKey, managed); + apiKey, apiSecret, + aafMechId, aafPassword, + partitionKey, + dme2Environment, dme2AftEnvironment, + dme2Partner, dme2Latitude, dme2Longitude, + dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts); + dmaapTopicWriters.add(dmaapTopicSink); } return dmaapTopicWriters; @@ -305,4 +484,4 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { return writers; } -} +}
\ No newline at end of file diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java index f8d85eb7..9f60556c 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java @@ -24,16 +24,26 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource; import org.openecomp.policy.common.logging.flexlogger.FlexLogger; import org.openecomp.policy.common.logging.flexlogger.Logger; import org.openecomp.policy.drools.properties.PolicyProperties; + /** * DMAAP Topic Source Factory */ public interface DmaapTopicSourceFactory { + public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; + public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; + public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + public final String DME2_VERSION_PROPERTY = "Version"; + public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; + public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; + public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; + public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; /** * Creates an DMAAP Topic Source based on properties files @@ -60,6 +70,8 @@ public interface DmaapTopicSourceFactory { * @param fetchTimeout Read Fetch Timeout * @param fetchLimit Fetch Limit * @param managed is this endpoind managed? + * @param useHttps does the connection use HTTPS? + * @param allowSelfSignedCerts does connection allow self-signed certificates? * * @return an DMAAP Topic Source * @throws IllegalArgumentException if invalid parameters are present @@ -74,7 +86,56 @@ public interface DmaapTopicSourceFactory { String consumerInstance, int fetchTimeout, int fetchLimit, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) + throws IllegalArgumentException; + + /** + * Instantiates a new DMAAP Topic Source + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName user name + * @param password password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Read Fetch Timeout + * @param fetchLimit Fetch Limit + * @param environment DME2 environment + * @param aftEnvironment DME2 AFT environment + * @param partner DME2 Partner + * @param latitude DME2 latitude + * @param longitude DME2 longitude + * @param additionalProps additional properties to pass to DME2 + * @param managed is this endpoind managed? + * @param useHttps does the connection use HTTPS? + * @param allowSelfSignedCerts does connection allow self-signed certificates? + * + * @return an DMAAP Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String consumerGroup, + String consumerInstance, + int fetchTimeout, + int fetchLimit, + String environment, + String aftEnvironment, + String partner, + String latitude, + String longitude, + Map<String,String> additionalProps, + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException; /** @@ -104,7 +165,7 @@ public interface DmaapTopicSourceFactory { * @return an DMAAP Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSource build(List<String> servers, + public DmaapTopicSource build(List<String> servers, String topic) throws IllegalArgumentException; @@ -154,7 +215,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { */ protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<String, DmaapTopicSource>(); - + /** * {@inheritDoc} */ @@ -169,7 +230,15 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { String consumerInstance, int fetchTimeout, int fetchLimit, - boolean managed) + String environment, + String aftEnvironment, + String partner, + String latitude, + String longitude, + Map<String,String> additionalProps, + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { if (topic == null || topic.isEmpty()) { @@ -185,7 +254,53 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, userName, password, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + fetchTimeout, fetchLimit, + environment, aftEnvironment, partner, + latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts); + + if (managed) + dmaapTopicSources.put(topic, dmaapTopicSource); + + return dmaapTopicSource; + } + } + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String consumerGroup, + String consumerInstance, + int fetchTimeout, + int fetchLimit, + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) + throws IllegalArgumentException { + + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("DMaaP Server(s) must be provided"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized(this) { + if (dmaapTopicSources.containsKey(topic)) { + return dmaapTopicSources.get(topic); + } + + DmaapTopicSource dmaapTopicSource = + new SingleThreadedDmaapTopicSource(servers, topic, + apiKey, apiSecret, userName, password, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts); if (managed) dmaapTopicSources.put(topic, dmaapTopicSource); @@ -216,12 +331,9 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (servers == null || servers.isEmpty()) { - logger.error("No UEB servers provided in " + properties); - continue; - } - - List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + List<String> serverList; + if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + else serverList = new ArrayList<>(); String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + @@ -250,6 +362,70 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + + /* DME2 Properties */ + + String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + + String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + + String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); + + String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); + + String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + + String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + + String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); + + String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); + + String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); + + String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); + + String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); + + String dme2SessionStickinessRequired = properties + .getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); + + Map<String,String> dme2AdditionalProps = new HashMap<>(); + + if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) + dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); + if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) + dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); + if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) + dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); + if (dme2Version != null && !dme2Version.isEmpty()) + dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); + if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) + dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) + dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); + if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) + dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); + + + if (servers == null || servers.isEmpty()) { + + logger.error("No DMaaP servers or DME2 ServiceName provided"); + continue; + } + int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { try { @@ -279,10 +455,33 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { managed = Boolean.parseBoolean(managedString); } + String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + //default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()){ + useHttps = Boolean.parseBoolean(useHttpsString); + } + + String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + //default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, managed); + fetchTimeout, fetchLimit, + dme2Environment, dme2AftEnvironment, dme2Partner, + dme2Latitude, dme2Longitude, dme2AdditionalProps, + managed, useHttps, allowSelfSignedCerts); + dmaapTopicSource_s.add(uebTopicSource); } } @@ -291,25 +490,29 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { /** * {@inheritDoc} + * @throws IllegalArgumentException */ @Override public DmaapTopicSource build(List<String> servers, String topic, String apiKey, - String apiSecret) { + String apiSecret) throws IllegalArgumentException { return this.build(servers, topic, apiKey, apiSecret, null, null, null, null, DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, - true); + true, + false, + false); } /** * {@inheritDoc} + * @throws IllegalArgumentException */ @Override - public DmaapTopicSource build(List<String> servers, String topic) { + public DmaapTopicSource build(List<String> servers, String topic) throws IllegalArgumentException { return this.build(servers, topic, null, null); } @@ -352,7 +555,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { if (dmaapTopicSources.containsKey(topic)) { return dmaapTopicSources.get(topic); } else { - throw new IllegalArgumentException("DmaapTopicSource for " + topic + " not found"); + throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found"); } } } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java index 85b98838..432f035c 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java @@ -54,7 +54,9 @@ public interface UebTopicSinkFactory { String apiKey, String apiSecret, String partitionKey, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException; /** @@ -135,9 +137,15 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { String apiKey, String apiSecret, String partitionKey, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } + if (topic == null || topic.isEmpty()) { throw new IllegalArgumentException("A topic must be provided"); } @@ -149,7 +157,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, - apiKey, apiSecret,partitionKey); + apiKey, apiSecret,partitionKey, useHttps, allowSelfSignedCerts); if (managed) uebTopicSinks.put(topic, uebTopicWriter); @@ -164,7 +172,8 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { */ @Override public UebTopicSink build(List<String> servers, String topic) throws IllegalArgumentException { - return this.build(servers, topic, null, null, null, true); + + return this.build(servers, topic, null, null, null, true, false, false); } @@ -212,9 +221,28 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { managed = Boolean.parseBoolean(managedString); } + String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + //default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()){ + useHttps = Boolean.parseBoolean(useHttpsString); + } + + + String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + //default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, - partitionKey, managed); + partitionKey, managed, useHttps, allowSelfSignedCerts); uebTopicWriters.add(uebTopicWriter); } return uebTopicWriters; diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java index bf2a4038..1729576f 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java @@ -71,7 +71,9 @@ public interface UebTopicSourceFactory { String consumerInstance, int fetchTimeout, int fetchLimit, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException; /** @@ -162,8 +164,13 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { String consumerInstance, int fetchTimeout, int fetchLimit, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } if (topic == null || topic.isEmpty()) { throw new IllegalArgumentException("A topic must be provided"); @@ -178,7 +185,7 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { new SingleThreadedUebTopicSource(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); if (managed) uebTopicSources.put(topic, uebTopicSource); @@ -263,10 +270,28 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { managed = Boolean.parseBoolean(managedString); } + String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + //default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()){ + useHttps = Boolean.parseBoolean(useHttpsString); + } + + String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + //default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + UebTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, managed); + fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts); uebTopicSources.add(uebTopicSource); } } @@ -281,11 +306,12 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { String topic, String apiKey, String apiSecret) { + return this.build(servers, topic, apiKey, apiSecret, null, null, UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH, - UebTopicSource.DEFAULT_LIMIT_FETCH, true); + UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true); } /** diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java index 6fee5ce0..a34d361b 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -22,12 +22,20 @@ package org.openecomp.policy.drools.event.comm.bus.internal; import java.net.MalformedURLException; import java.security.GeneralSecurityException; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Properties; +import org.openecomp.policy.common.logging.eelf.PolicyLogger; +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory; +import org.openecomp.policy.drools.properties.PolicyProperties; + import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.cambria.client.CambriaConsumer; +import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.client.response.MRConsumerResponse; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; @@ -76,17 +84,40 @@ public interface BusConsumer { public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit) + int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) throws IllegalArgumentException { ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder(); - builder.knownAs(consumerGroup, consumerInstance) + + if (useHttps){ + + if(useSelfSignedCerts){ + builder.knownAs(consumerGroup, consumerInstance) + .usingHosts(servers) + .onTopic(topic) + .waitAtServer(fetchTimeout) + .receivingAtMost(fetchLimit) + .usingHttps() + .allowSelfSignedCertificates(); + } + else{ + builder.knownAs(consumerGroup, consumerInstance) + .usingHosts(servers) + .onTopic(topic) + .waitAtServer(fetchTimeout) + .receivingAtMost(fetchLimit) + .usingHttps(); + } + } + else{ + builder.knownAs(consumerGroup, consumerInstance) .usingHosts(servers) .onTopic(topic) .waitAtServer(fetchTimeout) .receivingAtMost(fetchLimit); + } if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { @@ -123,8 +154,11 @@ public interface BusConsumer { /** * MR based consumer */ - public static class DmaapConsumerWrapper implements BusConsumer { + public abstract class DmaapConsumerWrapper implements BusConsumer { + protected int fetchTimeout; + protected Object closeCondition = new Object(); + /** * MR Consumer */ @@ -137,47 +171,75 @@ public interface BusConsumer { * @param topic topic * @param apiKey API Key * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password + * @param username AAF Login + * @param password AAF Password * @param consumerGroup Consumer Group * @param consumerInstance Consumer Instance * @param fetchTimeout Fetch Timeout * @param fetchLimit Fetch Limit + * @throws MalformedURLException */ + @SuppressWarnings("unchecked") public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String aafLogin, String aafPassword, + String username, String password, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit) - throws Exception { + int fetchTimeout, int fetchLimit, boolean useHttps) + + throws MalformedURLException { + + this.fetchTimeout = fetchTimeout; + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("No topic for DMaaP"); + } this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, null, apiKey, apiSecret); - this.consumer.setUsername(aafLogin); - this.consumer.setPassword(aafPassword); + this.consumer.setUsername(username); + this.consumer.setPassword(password); + - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - Properties props = new Properties(); - props.setProperty("Protocol", "http"); - this.consumer.setProps(props); - this.consumer.setHost(servers.get(0) + ":3904");; } /** * {@inheritDoc} */ public Iterable<String> fetch() throws Exception { - return this.consumer.fetch(); + MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); + + if (PolicyLogger.isDebugEnabled() && response != null) + PolicyLogger.debug(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage()); + + if (response.getResponseCode() == null || !response.getResponseCode().equals("200")) { + if (response.getResponseCode() == null) + PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received response code null"); + else + PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage()); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + } + + if (response.getActualMessages() == null) + return new ArrayList<String>(); + else + return response.getActualMessages(); } /** * {@inheritDoc} */ public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + this.consumer.close(); } @@ -196,7 +258,181 @@ public interface BusConsumer { } } + /** + * MR based consumer + */ + public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { + private Properties props; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param aafLogin AAF Login + * @param aafPassword AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapAafConsumerWrapper(List<String> servers, String topic, + String apiKey, String apiSecret, + String aafLogin, String aafPassword, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException { + + super(servers, topic, apiKey, apiSecret, + aafLogin, aafPassword, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, useHttps); + + // super constructor sets servers = {""} if empty to avoid errors when using DME2 + if ((servers.size() == 1 && servers.get(0).equals("")) || + (servers == null) || (servers.size() == 0)) { + throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + } + + this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + props = new Properties(); + + if(useHttps){ + props.setProperty("Protocol", "https"); + this.consumer.setHost(servers.get(0) + ":3905"); + + } + else{ + props.setProperty("Protocol", "http"); + this.consumer.setHost(servers.get(0) + ":3904"); + } + + this.consumer.setProps(props); + PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + MRConsumerImpl consumer = (MRConsumerImpl) this.consumer; + + builder. + append("DmaapConsumerWrapper ["). + append("consumer.getAuthDate()=").append(consumer.getAuthDate()). + append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). + append(", consumer.getHost()=").append(consumer.getHost()). + append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). + append(", consumer.getUsername()=").append(consumer.getUsername()). + append("]"); + return builder.toString(); + } + } + public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { + private Properties props; + + public DmaapDmeConsumerWrapper(List<String> servers, String topic, + String apiKey, String apiSecret, + String dme2Login, String dme2Password, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit, + String environment, String aftEnvironment, String dme2Partner, + String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) throws MalformedURLException { + + + + super(servers, topic, apiKey, apiSecret, + dme2Login, dme2Password, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, useHttps); + + + String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + if (environment == null || environment.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); + } if (aftEnvironment == null || aftEnvironment.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); + } if (latitude == null || latitude.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP"); + } if (longitude == null || longitude.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP"); + } + + if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + + String serviceName = servers.get(0); + + this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + + this.consumer.setUsername(dme2Login); + this.consumer.setPassword(dme2Password); + + props = new Properties(); + + props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + + props.setProperty("username", dme2Login); + props.setProperty("password", dme2Password); + + /* These are required, no defaults */ + props.setProperty("topic", topic); + + props.setProperty("Environment", environment); + props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + + if (dme2Partner != null) + props.setProperty("Partner", dme2Partner); + if (dme2RouteOffer != null) + props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + + props.setProperty("Latitude", latitude); + props.setProperty("Longitude", longitude); + + /* These are optional, will default to these values if not set in additionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + props.setProperty("MethodType", "GET"); + + if(useHttps){ + props.setProperty("Protocol", "https"); + + } + else{ + props.setProperty("Protocol", "http"); + } + + props.setProperty("contenttype", "application/json"); + + if (additionalProps != null) { + for(String key : additionalProps.keySet()) + props.put(key, additionalProps.get(key)); + } + + MRClientFactory.prop = props; + this.consumer.setProps(props); + + PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this); + } + } } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java index 798bf989..b5595b2d 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java @@ -24,14 +24,20 @@ import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.openecomp.policy.common.logging.eelf.PolicyLogger; +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory; +import org.openecomp.policy.drools.properties.PolicyProperties; +import org.slf4j.LoggerFactory; + import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; +import com.att.nsa.mr.client.response.MRPublisherResponse; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -65,15 +71,21 @@ public interface BusPublisher { public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, - String apiSecret) - throws IllegalArgumentException { + String apiSecret, boolean useHttps) throws IllegalArgumentException { PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); - - builder.usingHosts(servers) + + + if (useHttps){ + + builder.usingHosts(servers) + .onTopic(topic) + .usingHttps(); + } + else{ + builder.usingHosts(servers) .onTopic(topic); - - // Only supported in 0.2.4 version - // .logSendFailuresAfter(DEFAULT_LOG_SEND_FAILURES_AFTER); + } + if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { @@ -142,43 +154,105 @@ public interface BusPublisher { /** * DmaapClient library wrapper */ - public static class DmaapPublisherWrapper implements BusPublisher { + public abstract class DmaapPublisherWrapper implements BusPublisher { /** * MR based Publisher */ protected MRSimplerBatchPublisher publisher; + protected Properties props; - public DmaapPublisherWrapper(List<String> servers, String topic, - String aafLogin, - String aafPassword) { + /** + * MR Publisher Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param username AAF or DME2 Login + * @param password AAF or DME2 Password + */ + public DmaapPublisherWrapper(ProtocolTypeConstants protocol, + List<String> servers, String topic, + String username, + String password, boolean useHttps) throws IllegalArgumentException { + - ArrayList<String> dmaapServers = new ArrayList<String>(); - for (String server: servers) { - dmaapServers.add(server + ":3904"); + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("No topic for DMaaP"); } - - this.publisher = - new MRSimplerBatchPublisher.Builder(). - againstUrls(dmaapServers). - onTopic(topic). - build(); - this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + if (protocol == ProtocolTypeConstants.AAF_AUTH) { + if (servers == null || servers.isEmpty()) + throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); + + ArrayList<String> dmaapServers = new ArrayList<String>(); + if(useHttps){ + for (String server: servers) { + dmaapServers.add(server + ":3905"); + } + + } + else{ + for (String server: servers) { + dmaapServers.add(server + ":3904"); + } + } + + + this.publisher = + new MRSimplerBatchPublisher.Builder(). + againstUrls(dmaapServers). + onTopic(topic). + build(); + + this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + } else if (protocol == ProtocolTypeConstants.DME2) { + ArrayList<String> dmaapServers = new ArrayList<String>(); + dmaapServers.add("0.0.0.0:3904"); + + this.publisher = + new MRSimplerBatchPublisher.Builder(). + againstUrls(dmaapServers). + onTopic(topic). + build(); + + this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + + } + + this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); - this.publisher.setUsername(aafLogin); - this.publisher.setPassword(aafPassword); + this.publisher.setUsername(username); + this.publisher.setPassword(password); - Properties props = new Properties(); - props.setProperty("Protocol", "http"); + props = new Properties(); + + if(useHttps){ + + props.setProperty("Protocol", "https"); + } + else{ + + props.setProperty("Protocol", "http"); + + } + props.setProperty("contenttype", "application/json"); + props.setProperty("username", username); + props.setProperty("password", password); + + props.setProperty("topic", topic); this.publisher.setProps(props); - this.publisher.setHost(servers.get(0)); + if (protocol == ProtocolTypeConstants.AAF_AUTH) + this.publisher.setHost(servers.get(0)); - if (PolicyLogger.isInfoEnabled()) + if (PolicyLogger.isInfoEnabled()) { PolicyLogger.info(DmaapPublisherWrapper.class.getName(), "CREATION: " + this); + PolicyLogger.info(DmaapPublisherWrapper.class.getName(), + "BusPublisher.DmaapPublisherWrapper using Protocol: " + protocol.getValue()); + } + } /** @@ -208,7 +282,16 @@ public interface BusPublisher { if (message == null) throw new IllegalArgumentException("No message provided"); + this.publisher.setPubResponse(new MRPublisherResponse()); this.publisher.send(partitionId, message); + MRPublisherResponse response = this.publisher.sendBatchWithResponse(); + if (PolicyLogger.isDebugEnabled() && response != null) { + PolicyLogger.debug(DmaapPublisherWrapper.class.getName(), + "DMaaP publisher received " + response.getResponseCode() + ": " + + response.getResponseMessage()); + + } + return true; } @@ -227,5 +310,97 @@ public interface BusPublisher { return builder.toString(); } } + + /** + * DmaapClient library wrapper + */ + public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { + /** + * MR based Publisher + */ + protected MRSimplerBatchPublisher publisher; + + public DmaapAafPublisherWrapper(List<String> servers, String topic, + String aafLogin, + String aafPassword, boolean useHttps) { + + super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps); + } + } + + public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { + public DmaapDmePublisherWrapper(List<String> servers, String topic, + String username, String password, + String environment, String aftEnvironment, String dme2Partner, + String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) { + + super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps); + + + + + + + String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + if (environment == null || environment.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); + } if (aftEnvironment == null || aftEnvironment.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); + } if (latitude == null || latitude.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP"); + } if (longitude == null || longitude.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP"); + } + + if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + + String serviceName = servers.get(0); + + /* These are required, no defaults */ + props.setProperty("Environment", environment); + props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + + props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + if (dme2Partner != null) + props.setProperty("Partner", dme2Partner); + if (dme2RouteOffer != null) + props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + + props.setProperty("Latitude", latitude); + props.setProperty("Longitude", longitude); + + // ServiceName also a default, found in additionalProps + + /* These are optional, will default to these values if not set in optionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + props.setProperty("MethodType", "POST"); + + for (String key : additionalProps.keySet()) { + String value = additionalProps.get(key); + + if (value != null) + props.setProperty(key, value); + } + + this.publisher.setProps(props); + } + } } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java index e36e3afc..4ac1c6fc 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java @@ -23,7 +23,6 @@ package org.openecomp.policy.drools.event.comm.bus.internal; import java.util.List; import org.apache.commons.collections4.queue.CircularFifoQueue; - import org.openecomp.policy.drools.event.comm.Topic; import org.openecomp.policy.drools.event.comm.bus.BusTopic; @@ -35,13 +34,30 @@ public abstract class BusTopicBase implements BusTopic, Topic { protected String apiKey; protected String apiSecret; + protected boolean useHttps; + protected boolean allowSelfSignedCerts; protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<String>(10); + /** + * Instantiates a new Bus Topic Base + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * + * @return a Bus Topic Base + * @throws IllegalArgumentException if invalid parameters are present + */ public BusTopicBase(List<String> servers, String topic, String apiKey, - String apiSecret) + String apiSecret, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { if (servers == null || servers.isEmpty()) { @@ -57,6 +73,8 @@ public abstract class BusTopicBase implements BusTopic, Topic { this.apiKey = apiKey; this.apiSecret = apiSecret; + this.useHttps = useHttps; + this.allowSelfSignedCerts = allowSelfSignedCerts; } /** @@ -92,6 +110,20 @@ public abstract class BusTopicBase implements BusTopic, Topic { } /** + * @return useHttps + */ + public boolean isUseHttps(){ + return useHttps; + } + + /** + * @return allowSelfSignedCerts + */ + public boolean isAllowSelfSignedCerts(){ + return allowSelfSignedCerts; + } + + /** * @return the recentEvents */ @Override @@ -104,8 +136,13 @@ public abstract class BusTopicBase implements BusTopic, Topic { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("UebTopicBase [servers=").append(servers).append(", topic=").append(topic).append(", apiKey=") - .append(apiKey).append(", apiSecret=").append(apiSecret).append("]"); + builder.append("UebTopicBase [servers=").append(servers) + .append(", topic=").append(topic) + .append(", apiKey=").append(apiKey) + .append(", apiSecret=").append(apiSecret) + .append(", useHttps=").append(useHttps) + .append(", allowSelfSignedCerts=").append(allowSelfSignedCerts) + .append("]"); return builder.toString(); } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java index bd88818b..a78de716 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java @@ -83,13 +83,15 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi * @param apiKey api secret * @param apiSecret api secret * @param partitionId partition id + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow * @throws IllegalArgumentException in invalid parameters are passed in */ public InlineBusTopicSink(List<String> servers, String topic, - String apiKey, String apiSecret, String partitionId) + String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts) throws IllegalArgumentException { - super(servers, topic, apiKey, apiSecret); + super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); if (partitionId == null || partitionId.isEmpty()) { this.partitionId = UUID.randomUUID ().toString(); diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java index 417c6d47..f5a3dc11 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -21,11 +21,12 @@ package org.openecomp.policy.drools.event.comm.bus.internal; import java.util.List; +import java.util.Map; -import org.openecomp.policy.drools.event.comm.Topic; -import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink; import org.openecomp.policy.common.logging.flexlogger.FlexLogger; import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink; /** * This implementation publishes events for the associated DMAAP topic, @@ -39,13 +40,68 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop protected final String userName; protected final String password; + protected String environment = null; + protected String aftEnvironment = null; + protected String partner = null; + protected String latitude = null; + protected String longitude = null; + + protected Map<String,String> additionalProps = null; + + /** + * + * @param servers DMaaP servers + * @param topic DMaaP Topic to be monitored + * @param apiKey DMaaP API Key (optional) + * @param apiSecret DMaaP API Secret (optional) + * @param consumerGroup DMaaP Reader Consumer Group + * @param consumerInstance DMaaP Reader Instance + * @param fetchTimeout DMaaP fetch timeout + * @param fetchLimit DMaaP fetch limit + * @param environment DME2 Environment + * @param aftEnvironment DME2 AFT Environment + * @param partner DME2 Partner + * @param latitude DME2 Latitude + * @param longitude DME2 Longitude + * @param additionalProps Additional properties to pass to DME2 + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * + * @throws IllegalArgumentException An invalid parameter passed in + */ + public InlineDmaapTopicSink(List<String> servers, String topic, + String apiKey, String apiSecret, + String userName, String password, + String partitionKey, + String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map<String,String> additionalProps, + boolean useHttps, boolean allowSelfSignedCerts) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); + + this.userName = userName; + this.password = password; + + this.environment = environment; + this.aftEnvironment = aftEnvironment; + this.partner = partner; + + this.latitude = latitude; + this.longitude = longitude; + + this.additionalProps = additionalProps; + + this.init(); + } + public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName, String password, - String partitionKey) + String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) throws IllegalArgumentException { - super(servers, topic, apiKey, apiSecret, partitionKey); + super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); this.userName = userName; this.password = password; @@ -54,11 +110,24 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop @Override public void init() { - this.publisher = - new BusPublisher.DmaapPublisherWrapper(this.servers, + if ((this.environment == null || this.environment.isEmpty()) && + (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) && + (this.latitude == null || this.latitude.isEmpty()) && + (this.longitude == null || this.longitude.isEmpty()) && + (this.partner == null || this.partner.isEmpty())) { + this.publisher = + new BusPublisher.DmaapAafPublisherWrapper(this.servers, this.topic, this.userName, - this.password); + this.password, this.useHttps); + } else { + this.publisher = + new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, + this.userName, this.password, + this.environment, this.aftEnvironment, + this.partner, this.latitude, this.longitude, + this.additionalProps, this.useHttps); + } if (logger.isInfoEnabled()) logger.info("DMAAP SINK TOPIC created " + this); } @@ -81,4 +150,4 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop return builder.toString(); } -} +}
\ No newline at end of file diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java index 2d4b1552..c93e0f2b 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java @@ -46,6 +46,8 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi * @param apiKey the api key (optional) * @param apiSecret the api secret (optional) * @param partitionId the partition key (optional, autogenerated if not provided) + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow * * @throws IllegalArgumentException if invalid arguments are detected */ @@ -53,9 +55,11 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi String topic, String apiKey, String apiSecret, - String partitionId) + String partitionId, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { - super(servers, topic, apiKey, apiSecret, partitionId); + super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts); } /** @@ -68,7 +72,8 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, - this.apiSecret); + this.apiSecret, + this.useHttps); if (logger.isInfoEnabled()) logger.info("UEB SINK TOPIC created " + this); } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java index f37c349e..d3be9163 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -98,6 +98,7 @@ public abstract class SingleThreadedBusTopicSource */ protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>(); + /** * * @param servers Bus servers @@ -108,6 +109,8 @@ public abstract class SingleThreadedBusTopicSource * @param consumerInstance Bus Reader Instance * @param fetchTimeout Bus fetch timeout * @param fetchLimit Bus fetch limit + * @param useHttps does the bus use https + * @param allowSelfSignedCerts are self-signed certificates allowed * @throws IllegalArgumentException An invalid parameter passed in */ public SingleThreadedBusTopicSource(List<String> servers, @@ -117,10 +120,12 @@ public abstract class SingleThreadedBusTopicSource String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit) + int fetchLimit, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { - super(servers, topic, apiKey, apiSecret); + super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); if (consumerGroup == null || consumerGroup.isEmpty()) { this.consumerGroup = UUID.randomUUID ().toString(); @@ -145,6 +150,7 @@ public abstract class SingleThreadedBusTopicSource } else { this.fetchLimit = fetchLimit; } + } /** diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index e65d44a7..2ced5bcb 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -21,10 +21,11 @@ package org.openecomp.policy.drools.event.comm.bus.internal; import java.util.List; +import java.util.Map; +import org.openecomp.policy.common.logging.eelf.PolicyLogger; import org.openecomp.policy.drools.event.comm.Topic; import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource; -import org.openecomp.policy.common.logging.eelf.PolicyLogger; /** * This topic reader implementation specializes in reading messages @@ -33,10 +34,21 @@ import org.openecomp.policy.common.logging.eelf.PolicyLogger; public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable { + + protected boolean allowSelfSignedCerts; protected final String userName; protected final String password; private String className = SingleThreadedDmaapTopicSource.class.getName(); + + protected String environment = null; + protected String aftEnvironment = null; + protected String partner = null; + protected String latitude = null; + protected String longitude = null; + + protected Map<String,String> additionalProps = null; + /** * * @param servers DMaaP servers @@ -47,19 +59,75 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource * @param consumerInstance DMaaP Reader Instance * @param fetchTimeout DMaaP fetch timeout * @param fetchLimit DMaaP fetch limit + * @param environment DME2 Environment + * @param aftEnvironment DME2 AFT Environment + * @param partner DME2 Partner + * @param latitude DME2 Latitude + * @param longitude DME2 Longitude + * @param additionalProps Additional properties to pass to DME2 + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedDmaapTopicSource(List<String> servers, String topic, + String apiKey, String apiSecret, + String userName, String password, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit, + String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map<String,String> additionalProps, + boolean useHttps, boolean allowSelfSignedCerts) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts); + + this.userName = userName; + this.password = password; + + this.environment = environment; + this.aftEnvironment = aftEnvironment; + this.partner = partner; + + this.latitude = latitude; + this.longitude = longitude; + + this.additionalProps = additionalProps; + try { + this.init(); + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalArgumentException(e); + } + } + + /** + * + * @param servers DMaaP servers + * @param topic DMaaP Topic to be monitored + * @param apiKey DMaaP API Key (optional) + * @param apiSecret DMaaP API Secret (optional) + * @param consumerGroup DMaaP Reader Consumer Group + * @param consumerInstance DMaaP Reader Instance + * @param fetchTimeout DMaaP fetch timeout + * @param fetchLimit DMaaP fetch limit + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow * @throws IllegalArgumentException An invalid parameter passed in */ public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, String userName, String password, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit) + int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) throws IllegalArgumentException { super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); this.userName = userName; this.password = password; @@ -78,22 +146,35 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource */ @Override public void init() throws Exception { - if (this.userName == null || this.userName.isEmpty() || - this.password == null || this.password.isEmpty()) { - this.consumer = - new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, - this.apiKey, this.apiSecret, - this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit); - } else { - this.consumer = - new BusConsumer.DmaapConsumerWrapper(this.servers, this.topic, - this.apiKey, this.apiSecret, - this.userName, this.password, - this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit); - } + this.password == null || this.password.isEmpty()) { + this.consumer = + new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit, + this.useHttps, this.allowSelfSignedCerts); + } else if ((this.environment == null || this.environment.isEmpty()) && + (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) && + (this.latitude == null || this.latitude.isEmpty()) && + (this.longitude == null || this.longitude.isEmpty()) && + (this.partner == null || this.partner.isEmpty())) { + this.consumer = + new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.userName, this.password, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit, this.useHttps); + } else { + this.consumer = + new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.userName, this.password, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit, + this.environment, this.aftEnvironment, this.partner, + this.latitude, this.longitude, this.additionalProps, this.useHttps); + } PolicyLogger.info(className, "CREATION: " + this); } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java index edb55c75..641eddaa 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -42,18 +42,24 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource * @param consumerInstance UEB Reader Instance * @param fetchTimeout UEB fetch timeout * @param fetchLimit UEB fetch limit + * @param useHttps does topicSource use HTTPS? + * @param allowSelfSignedCerts does topicSource allow self-signed certs? + * * @throws IllegalArgumentException An invalid parameter passed in */ + + public SingleThreadedUebTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit) + int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) throws IllegalArgumentException { super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); + this.allowSelfSignedCerts = allowSelfSignedCerts; this.init(); } @@ -67,7 +73,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit); + this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts); } /** @@ -77,6 +83,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource public CommInfrastructure getTopicCommInfrastructure() { return Topic.CommInfrastructure.UEB; } + @Override public String toString() { |