summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/openecomp
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/openecomp')
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java7
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java205
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java235
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java38
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java36
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java268
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java229
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java45
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java6
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java85
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java11
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java10
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java117
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java13
14 files changed, 1177 insertions, 128 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java
index 7a2e9711..a3c2230d 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java
@@ -31,12 +31,7 @@ public interface TopicListener {
* @param commType communication infrastructure type
* @param topic topic name
* @param event event message as a string
- *
- * @return boolean. True if the invoking event dispatcher should continue
- * dispatching the event to subsequent listeners. False if it is requested
- * to the invoking event dispatcher to stop dispatching the same event to
- * other listeners of less priority. This mechanism is generally not used.
*/
- public boolean onTopicEvent(Topic.CommInfrastructure commType, String topic, String event);
+ public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event);
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
index 5b4cfd42..c3d02d14 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
@@ -24,17 +24,64 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
-import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
import org.openecomp.policy.drools.properties.PolicyProperties;
/**
* DMAAP Topic Sink Factory
*/
public interface DmaapTopicSinkFactory {
+ public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
+ public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
+ public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+ public final String DME2_VERSION_PROPERTY = "Version";
+ public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
+ public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
+ public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
+ public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
+
+ /**
+ * Instantiates a new DMAAP Topic Sink
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param userName AAF user name
+ * @param password AAF password
+ * @param partitionKey Consumer Group
+ * @param environment DME2 environment
+ * @param aftEnvironment DME2 AFT environment
+ * @param partner DME2 Partner
+ * @param latitude DME2 latitude
+ * @param longitude DME2 longitude
+ * @param additionalProps additional properties to pass to DME2
+ * @param managed is this sink endpoint managed?
+ *
+ * @return an DMAAP Topic Sink
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public DmaapTopicSink build(List<String> servers,
+ String topic,
+ String apiKey,
+ String apiSecret,
+ String userName,
+ String password,
+ String partitionKey,
+ String environment,
+ String aftEnvironment,
+ String partner,
+ String latitude,
+ String longitude,
+ Map<String,String> additionalProps,
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts) ;
/**
* Instantiates a new DMAAP Topic Sink
@@ -58,7 +105,9 @@ public interface DmaapTopicSinkFactory {
String userName,
String password,
String partitionKey,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException;
/**
@@ -142,7 +191,15 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
String userName,
String password,
String partitionKey,
- boolean managed)
+ String environment,
+ String aftEnvironment,
+ String partner,
+ String latitude,
+ String longitude,
+ Map<String,String> additionalProps,
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
if (topic == null || topic.isEmpty()) {
@@ -158,7 +215,45 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
new InlineDmaapTopicSink(servers, topic,
apiKey, apiSecret,
userName, password,
- partitionKey);
+ partitionKey,
+ environment, aftEnvironment,
+ partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
+
+ if (managed)
+ dmaapTopicWriters.put(topic, dmaapTopicSink);
+ return dmaapTopicSink;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public DmaapTopicSink build(List<String> servers,
+ String topic,
+ String apiKey,
+ String apiSecret,
+ String userName,
+ String password,
+ String partitionKey,
+ boolean managed,
+ boolean useHttps, boolean allowSelfSignedCerts)
+ throws IllegalArgumentException {
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ synchronized (this) {
+ if (dmaapTopicWriters.containsKey(topic)) {
+ return dmaapTopicWriters.get(topic);
+ }
+
+ DmaapTopicSink dmaapTopicSink =
+ new InlineDmaapTopicSink(servers, topic,
+ apiKey, apiSecret,
+ userName, password,
+ partitionKey, useHttps, allowSelfSignedCerts);
if (managed)
dmaapTopicWriters.put(topic, dmaapTopicSink);
@@ -172,7 +267,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
*/
@Override
public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
- return this.build(servers, topic, null, null, null, null, null, true);
+ return this.build(servers, topic, null, null, null, null, null, true, false, false);
}
@@ -196,12 +291,10 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." +
topic +
PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (servers == null || servers.isEmpty()) {
- logger.error("No DMAAP servers provided in " + properties);
- continue;
- }
- List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+ List<String> serverList;
+ if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+ else serverList = new ArrayList<>();
String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
"." + topic +
@@ -223,14 +316,100 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+
+ /* DME2 Properties */
+
+ String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+
+ String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+
+ String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
+
+ String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
+
+ String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+
+ String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+
+ String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
+
+ String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
+
+ String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS
+ + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
+
+ String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
+
+ String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
+
+ String dme2SessionStickinessRequired = properties
+ .getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
+
+ Map<String,String> dme2AdditionalProps = new HashMap<>();
+
+ if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
+ dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
+ if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
+ dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
+ if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
+ dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
+ if (dme2Version != null && !dme2Version.isEmpty())
+ dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
+ if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
+ dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+ if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
+ dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
+ if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
+ dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
+
+ if (servers == null || servers.isEmpty()) {
+ logger.error("No DMaaP servers or DME2 ServiceName provided");
+ continue;
+ }
+
boolean managed = true;
if (managedString != null && !managedString.isEmpty()) {
managed = Boolean.parseBoolean(managedString);
}
+ String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+ //default is to use HTTP if no https property exists
+ boolean useHttps = false;
+ if (useHttpsString != null && !useHttpsString.isEmpty()){
+ useHttps = Boolean.parseBoolean(useHttpsString);
+ }
+
+
+ String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+ //default is to disallow self-signed certs
+ boolean allowSelfSignedCerts = false;
+ if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
+ allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+ }
+
DmaapTopicSink dmaapTopicSink = this.build(serverList, topic,
- apiKey, apiSecret, aafMechId, aafPassword,
- partitionKey, managed);
+ apiKey, apiSecret,
+ aafMechId, aafPassword,
+ partitionKey,
+ dme2Environment, dme2AftEnvironment,
+ dme2Partner, dme2Latitude, dme2Longitude,
+ dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
+
dmaapTopicWriters.add(dmaapTopicSink);
}
return dmaapTopicWriters;
@@ -305,4 +484,4 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
return writers;
}
-}
+} \ No newline at end of file
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
index f8d85eb7..9f60556c 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
@@ -24,16 +24,26 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
import org.openecomp.policy.common.logging.flexlogger.Logger;
import org.openecomp.policy.drools.properties.PolicyProperties;
+
/**
* DMAAP Topic Source Factory
*/
public interface DmaapTopicSourceFactory {
+ public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
+ public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
+ public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+ public final String DME2_VERSION_PROPERTY = "Version";
+ public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
+ public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
+ public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
+ public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
/**
* Creates an DMAAP Topic Source based on properties files
@@ -60,6 +70,8 @@ public interface DmaapTopicSourceFactory {
* @param fetchTimeout Read Fetch Timeout
* @param fetchLimit Fetch Limit
* @param managed is this endpoind managed?
+ * @param useHttps does the connection use HTTPS?
+ * @param allowSelfSignedCerts does connection allow self-signed certificates?
*
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
@@ -74,7 +86,56 @@ public interface DmaapTopicSourceFactory {
String consumerInstance,
int fetchTimeout,
int fetchLimit,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
+ throws IllegalArgumentException;
+
+ /**
+ * Instantiates a new DMAAP Topic Source
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param userName user name
+ * @param password password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Read Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @param environment DME2 environment
+ * @param aftEnvironment DME2 AFT environment
+ * @param partner DME2 Partner
+ * @param latitude DME2 latitude
+ * @param longitude DME2 longitude
+ * @param additionalProps additional properties to pass to DME2
+ * @param managed is this endpoind managed?
+ * @param useHttps does the connection use HTTPS?
+ * @param allowSelfSignedCerts does connection allow self-signed certificates?
+ *
+ * @return an DMAAP Topic Source
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public DmaapTopicSource build(List<String> servers,
+ String topic,
+ String apiKey,
+ String apiSecret,
+ String userName,
+ String password,
+ String consumerGroup,
+ String consumerInstance,
+ int fetchTimeout,
+ int fetchLimit,
+ String environment,
+ String aftEnvironment,
+ String partner,
+ String latitude,
+ String longitude,
+ Map<String,String> additionalProps,
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException;
/**
@@ -104,7 +165,7 @@ public interface DmaapTopicSourceFactory {
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSource build(List<String> servers,
+ public DmaapTopicSource build(List<String> servers,
String topic)
throws IllegalArgumentException;
@@ -154,7 +215,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
*/
protected HashMap<String, DmaapTopicSource> dmaapTopicSources =
new HashMap<String, DmaapTopicSource>();
-
+
/**
* {@inheritDoc}
*/
@@ -169,7 +230,15 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
String consumerInstance,
int fetchTimeout,
int fetchLimit,
- boolean managed)
+ String environment,
+ String aftEnvironment,
+ String partner,
+ String latitude,
+ String longitude,
+ Map<String,String> additionalProps,
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
if (topic == null || topic.isEmpty()) {
@@ -185,7 +254,53 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
new SingleThreadedDmaapTopicSource(servers, topic,
apiKey, apiSecret, userName, password,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ fetchTimeout, fetchLimit,
+ environment, aftEnvironment, partner,
+ latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
+
+ if (managed)
+ dmaapTopicSources.put(topic, dmaapTopicSource);
+
+ return dmaapTopicSource;
+ }
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public DmaapTopicSource build(List<String> servers,
+ String topic,
+ String apiKey,
+ String apiSecret,
+ String userName,
+ String password,
+ String consumerGroup,
+ String consumerInstance,
+ int fetchTimeout,
+ int fetchLimit,
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
+ throws IllegalArgumentException {
+
+ if (servers == null || servers.isEmpty()) {
+ throw new IllegalArgumentException("DMaaP Server(s) must be provided");
+ }
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ synchronized(this) {
+ if (dmaapTopicSources.containsKey(topic)) {
+ return dmaapTopicSources.get(topic);
+ }
+
+ DmaapTopicSource dmaapTopicSource =
+ new SingleThreadedDmaapTopicSource(servers, topic,
+ apiKey, apiSecret, userName, password,
+ consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
if (managed)
dmaapTopicSources.put(topic, dmaapTopicSource);
@@ -216,12 +331,9 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
topic +
PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (servers == null || servers.isEmpty()) {
- logger.error("No UEB servers provided in " + properties);
- continue;
- }
-
- List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+ List<String> serverList;
+ if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+ else serverList = new ArrayList<>();
String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
"." + topic +
@@ -250,6 +362,70 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
"." + topic +
PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
+
+ /* DME2 Properties */
+
+ String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+
+ String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+
+ String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
+
+ String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
+
+ String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+
+ String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+
+ String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
+
+ String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
+
+ String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+ + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
+
+ String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
+
+ String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
+
+ String dme2SessionStickinessRequired = properties
+ .getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
+
+ Map<String,String> dme2AdditionalProps = new HashMap<>();
+
+ if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
+ dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
+ if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
+ dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
+ if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
+ dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
+ if (dme2Version != null && !dme2Version.isEmpty())
+ dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
+ if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
+ dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+ if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
+ dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
+ if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
+ dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
+
+
+ if (servers == null || servers.isEmpty()) {
+
+ logger.error("No DMaaP servers or DME2 ServiceName provided");
+ continue;
+ }
+
int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
try {
@@ -279,10 +455,33 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
managed = Boolean.parseBoolean(managedString);
}
+ String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+ //default is to use HTTP if no https property exists
+ boolean useHttps = false;
+ if (useHttpsString != null && !useHttpsString.isEmpty()){
+ useHttps = Boolean.parseBoolean(useHttpsString);
+ }
+
+ String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+ //default is to disallow self-signed certs
+ boolean allowSelfSignedCerts = false;
+ if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
+ allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+ }
+
+
DmaapTopicSource uebTopicSource = this.build(serverList, topic,
apiKey, apiSecret, aafMechId, aafPassword,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, managed);
+ fetchTimeout, fetchLimit,
+ dme2Environment, dme2AftEnvironment, dme2Partner,
+ dme2Latitude, dme2Longitude, dme2AdditionalProps,
+ managed, useHttps, allowSelfSignedCerts);
+
dmaapTopicSource_s.add(uebTopicSource);
}
}
@@ -291,25 +490,29 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
/**
* {@inheritDoc}
+ * @throws IllegalArgumentException
*/
@Override
public DmaapTopicSource build(List<String> servers,
String topic,
String apiKey,
- String apiSecret) {
+ String apiSecret) throws IllegalArgumentException {
return this.build(servers, topic,
apiKey, apiSecret, null, null,
null, null,
DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
DmaapTopicSource.DEFAULT_LIMIT_FETCH,
- true);
+ true,
+ false,
+ false);
}
/**
* {@inheritDoc}
+ * @throws IllegalArgumentException
*/
@Override
- public DmaapTopicSource build(List<String> servers, String topic) {
+ public DmaapTopicSource build(List<String> servers, String topic) throws IllegalArgumentException {
return this.build(servers, topic, null, null);
}
@@ -352,7 +555,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
if (dmaapTopicSources.containsKey(topic)) {
return dmaapTopicSources.get(topic);
} else {
- throw new IllegalArgumentException("DmaapTopicSource for " + topic + " not found");
+ throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
}
}
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
index 85b98838..432f035c 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
@@ -54,7 +54,9 @@ public interface UebTopicSinkFactory {
String apiKey,
String apiSecret,
String partitionKey,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException;
/**
@@ -135,9 +137,15 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
String apiKey,
String apiSecret,
String partitionKey,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
+ if (servers == null || servers.isEmpty()) {
+ throw new IllegalArgumentException("UEB Server(s) must be provided");
+ }
+
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException("A topic must be provided");
}
@@ -149,7 +157,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
UebTopicSink uebTopicWriter =
new InlineUebTopicSink(servers, topic,
- apiKey, apiSecret,partitionKey);
+ apiKey, apiSecret,partitionKey, useHttps, allowSelfSignedCerts);
if (managed)
uebTopicSinks.put(topic, uebTopicWriter);
@@ -164,7 +172,8 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
*/
@Override
public UebTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
- return this.build(servers, topic, null, null, null, true);
+
+ return this.build(servers, topic, null, null, null, true, false, false);
}
@@ -212,9 +221,28 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
managed = Boolean.parseBoolean(managedString);
}
+ String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+ //default is to use HTTP if no https property exists
+ boolean useHttps = false;
+ if (useHttpsString != null && !useHttpsString.isEmpty()){
+ useHttps = Boolean.parseBoolean(useHttpsString);
+ }
+
+
+ String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+ //default is to disallow self-signed certs
+ boolean allowSelfSignedCerts = false;
+ if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
+ allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+ }
+
UebTopicSink uebTopicWriter = this.build(serverList, topic,
apiKey, apiSecret,
- partitionKey, managed);
+ partitionKey, managed, useHttps, allowSelfSignedCerts);
uebTopicWriters.add(uebTopicWriter);
}
return uebTopicWriters;
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java
index bf2a4038..1729576f 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java
@@ -71,7 +71,9 @@ public interface UebTopicSourceFactory {
String consumerInstance,
int fetchTimeout,
int fetchLimit,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException;
/**
@@ -162,8 +164,13 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
String consumerInstance,
int fetchTimeout,
int fetchLimit,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
+ if (servers == null || servers.isEmpty()) {
+ throw new IllegalArgumentException("UEB Server(s) must be provided");
+ }
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException("A topic must be provided");
@@ -178,7 +185,7 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
new SingleThreadedUebTopicSource(servers, topic,
apiKey, apiSecret,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
if (managed)
uebTopicSources.put(topic, uebTopicSource);
@@ -263,10 +270,28 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
managed = Boolean.parseBoolean(managedString);
}
+ String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+ //default is to use HTTP if no https property exists
+ boolean useHttps = false;
+ if (useHttpsString != null && !useHttpsString.isEmpty()){
+ useHttps = Boolean.parseBoolean(useHttpsString);
+ }
+
+ String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+ //default is to disallow self-signed certs
+ boolean allowSelfSignedCerts = false;
+ if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
+ allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+ }
+
UebTopicSource uebTopicSource = this.build(serverList, topic,
apiKey, apiSecret,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, managed);
+ fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
uebTopicSources.add(uebTopicSource);
}
}
@@ -281,11 +306,12 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
String topic,
String apiKey,
String apiSecret) {
+
return this.build(servers, topic,
apiKey, apiSecret,
null, null,
UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
- UebTopicSource.DEFAULT_LIMIT_FETCH, true);
+ UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true);
}
/**
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java
index 6fee5ce0..a34d361b 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -22,12 +22,20 @@ package org.openecomp.policy.drools.event.comm.bus.internal;
import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import org.openecomp.policy.common.logging.eelf.PolicyLogger;
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+
import com.att.nsa.cambria.client.CambriaClientBuilders;
import com.att.nsa.cambria.client.CambriaConsumer;
+import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import com.att.nsa.mr.client.response.MRConsumerResponse;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
@@ -76,17 +84,40 @@ public interface BusConsumer {
public CambriaConsumerWrapper(List<String> servers, String topic,
String apiKey, String apiSecret,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit)
+ int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts)
throws IllegalArgumentException {
ConsumerBuilder builder =
new CambriaClientBuilders.ConsumerBuilder();
- builder.knownAs(consumerGroup, consumerInstance)
+
+ if (useHttps){
+
+ if(useSelfSignedCerts){
+ builder.knownAs(consumerGroup, consumerInstance)
+ .usingHosts(servers)
+ .onTopic(topic)
+ .waitAtServer(fetchTimeout)
+ .receivingAtMost(fetchLimit)
+ .usingHttps()
+ .allowSelfSignedCertificates();
+ }
+ else{
+ builder.knownAs(consumerGroup, consumerInstance)
+ .usingHosts(servers)
+ .onTopic(topic)
+ .waitAtServer(fetchTimeout)
+ .receivingAtMost(fetchLimit)
+ .usingHttps();
+ }
+ }
+ else{
+ builder.knownAs(consumerGroup, consumerInstance)
.usingHosts(servers)
.onTopic(topic)
.waitAtServer(fetchTimeout)
.receivingAtMost(fetchLimit);
+ }
if (apiKey != null && !apiKey.isEmpty() &&
apiSecret != null && !apiSecret.isEmpty()) {
@@ -123,8 +154,11 @@ public interface BusConsumer {
/**
* MR based consumer
*/
- public static class DmaapConsumerWrapper implements BusConsumer {
+ public abstract class DmaapConsumerWrapper implements BusConsumer {
+ protected int fetchTimeout;
+ protected Object closeCondition = new Object();
+
/**
* MR Consumer
*/
@@ -137,47 +171,75 @@ public interface BusConsumer {
* @param topic topic
* @param apiKey API Key
* @param apiSecret API Secret
- * @param aafLogin AAF Login
- * @param aafPassword AAF Password
+ * @param username AAF Login
+ * @param password AAF Password
* @param consumerGroup Consumer Group
* @param consumerInstance Consumer Instance
* @param fetchTimeout Fetch Timeout
* @param fetchLimit Fetch Limit
+ * @throws MalformedURLException
*/
+ @SuppressWarnings("unchecked")
public DmaapConsumerWrapper(List<String> servers, String topic,
String apiKey, String apiSecret,
- String aafLogin, String aafPassword,
+ String username, String password,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit)
- throws Exception {
+ int fetchTimeout, int fetchLimit, boolean useHttps)
+
+ throws MalformedURLException {
+
+ this.fetchTimeout = fetchTimeout;
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
+ }
this.consumer = new MRConsumerImpl(servers, topic,
consumerGroup, consumerInstance,
fetchTimeout, fetchLimit,
null, apiKey, apiSecret);
- this.consumer.setUsername(aafLogin);
- this.consumer.setPassword(aafPassword);
+ this.consumer.setUsername(username);
+ this.consumer.setPassword(password);
+
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
- Properties props = new Properties();
- props.setProperty("Protocol", "http");
- this.consumer.setProps(props);
- this.consumer.setHost(servers.get(0) + ":3904");;
}
/**
* {@inheritDoc}
*/
public Iterable<String> fetch() throws Exception {
- return this.consumer.fetch();
+ MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
+
+ if (PolicyLogger.isDebugEnabled() && response != null)
+ PolicyLogger.debug(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage());
+
+ if (response.getResponseCode() == null || !response.getResponseCode().equals("200")) {
+ if (response.getResponseCode() == null)
+ PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received response code null");
+ else
+ PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage());
+
+ synchronized (closeCondition) {
+ closeCondition.wait(fetchTimeout);
+ }
+ }
+
+ if (response.getActualMessages() == null)
+ return new ArrayList<String>();
+ else
+ return response.getActualMessages();
}
/**
* {@inheritDoc}
*/
public void close() {
+ synchronized (closeCondition) {
+ closeCondition.notifyAll();
+ }
+
this.consumer.close();
}
@@ -196,7 +258,181 @@ public interface BusConsumer {
}
}
+ /**
+ * MR based consumer
+ */
+ public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
+ private Properties props;
+
+ /**
+ * MR Consumer Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param aafLogin AAF Login
+ * @param aafPassword AAF Password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @throws MalformedURLException
+ */
+ public DmaapAafConsumerWrapper(List<String> servers, String topic,
+ String apiKey, String apiSecret,
+ String aafLogin, String aafPassword,
+ String consumerGroup, String consumerInstance,
+ int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException {
+
+ super(servers, topic, apiKey, apiSecret,
+ aafLogin, aafPassword,
+ consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit, useHttps);
+
+ // super constructor sets servers = {""} if empty to avoid errors when using DME2
+ if ((servers.size() == 1 && servers.get(0).equals("")) ||
+ (servers == null) || (servers.size() == 0)) {
+ throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
+ }
+
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+
+ props = new Properties();
+
+ if(useHttps){
+ props.setProperty("Protocol", "https");
+ this.consumer.setHost(servers.get(0) + ":3905");
+
+ }
+ else{
+ props.setProperty("Protocol", "http");
+ this.consumer.setHost(servers.get(0) + ":3904");
+ }
+
+ this.consumer.setProps(props);
+ PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ MRConsumerImpl consumer = (MRConsumerImpl) this.consumer;
+
+ builder.
+ append("DmaapConsumerWrapper [").
+ append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
+ append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
+ append(", consumer.getHost()=").append(consumer.getHost()).
+ append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
+ append(", consumer.getUsername()=").append(consumer.getUsername()).
+ append("]");
+ return builder.toString();
+ }
+ }
+ public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
+ private Properties props;
+
+ public DmaapDmeConsumerWrapper(List<String> servers, String topic,
+ String apiKey, String apiSecret,
+ String dme2Login, String dme2Password,
+ String consumerGroup, String consumerInstance,
+ int fetchTimeout, int fetchLimit,
+ String environment, String aftEnvironment, String dme2Partner,
+ String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) throws MalformedURLException {
+
+
+
+ super(servers, topic, apiKey, apiSecret,
+ dme2Login, dme2Password,
+ consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit, useHttps);
+
+
+ String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+
+ if (environment == null || environment.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
+ } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
+ } if (latitude == null || latitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
+ } if (longitude == null || longitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
+ }
+
+ if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
+ throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " +
+ PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ }
+
+ String serviceName = servers.get(0);
+
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+
+ this.consumer.setUsername(dme2Login);
+ this.consumer.setPassword(dme2Password);
+
+ props = new Properties();
+
+ props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+
+ props.setProperty("username", dme2Login);
+ props.setProperty("password", dme2Password);
+
+ /* These are required, no defaults */
+ props.setProperty("topic", topic);
+
+ props.setProperty("Environment", environment);
+ props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+
+ if (dme2Partner != null)
+ props.setProperty("Partner", dme2Partner);
+ if (dme2RouteOffer != null)
+ props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+
+ props.setProperty("Latitude", latitude);
+ props.setProperty("Longitude", longitude);
+
+ /* These are optional, will default to these values if not set in additionalProps */
+ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
+ props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
+ props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
+ props.setProperty("Version", "1.0");
+ props.setProperty("SubContextPath", "/");
+ props.setProperty("sessionstickinessrequired", "no");
+
+ /* These should not change */
+ props.setProperty("TransportType", "DME2");
+ props.setProperty("MethodType", "GET");
+
+ if(useHttps){
+ props.setProperty("Protocol", "https");
+
+ }
+ else{
+ props.setProperty("Protocol", "http");
+ }
+
+ props.setProperty("contenttype", "application/json");
+
+ if (additionalProps != null) {
+ for(String key : additionalProps.keySet())
+ props.put(key, additionalProps.get(key));
+ }
+
+ MRClientFactory.prop = props;
+ this.consumer.setProps(props);
+
+ PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this);
+ }
+ }
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
index 798bf989..b5595b2d 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
@@ -24,14 +24,20 @@ import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.openecomp.policy.common.logging.eelf.PolicyLogger;
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+import org.slf4j.LoggerFactory;
+
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.att.nsa.cambria.client.CambriaClientBuilders;
import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import com.att.nsa.mr.client.response.MRPublisherResponse;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -65,15 +71,21 @@ public interface BusPublisher {
public CambriaPublisherWrapper(List<String> servers, String topic,
String apiKey,
- String apiSecret)
- throws IllegalArgumentException {
+ String apiSecret, boolean useHttps) throws IllegalArgumentException {
PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
-
- builder.usingHosts(servers)
+
+
+ if (useHttps){
+
+ builder.usingHosts(servers)
+ .onTopic(topic)
+ .usingHttps();
+ }
+ else{
+ builder.usingHosts(servers)
.onTopic(topic);
-
- // Only supported in 0.2.4 version
- // .logSendFailuresAfter(DEFAULT_LOG_SEND_FAILURES_AFTER);
+ }
+
if (apiKey != null && !apiKey.isEmpty() &&
apiSecret != null && !apiSecret.isEmpty()) {
@@ -142,43 +154,105 @@ public interface BusPublisher {
/**
* DmaapClient library wrapper
*/
- public static class DmaapPublisherWrapper implements BusPublisher {
+ public abstract class DmaapPublisherWrapper implements BusPublisher {
/**
* MR based Publisher
*/
protected MRSimplerBatchPublisher publisher;
+ protected Properties props;
- public DmaapPublisherWrapper(List<String> servers, String topic,
- String aafLogin,
- String aafPassword) {
+ /**
+ * MR Publisher Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param username AAF or DME2 Login
+ * @param password AAF or DME2 Password
+ */
+ public DmaapPublisherWrapper(ProtocolTypeConstants protocol,
+ List<String> servers, String topic,
+ String username,
+ String password, boolean useHttps) throws IllegalArgumentException {
+
- ArrayList<String> dmaapServers = new ArrayList<String>();
- for (String server: servers) {
- dmaapServers.add(server + ":3904");
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
}
-
- this.publisher =
- new MRSimplerBatchPublisher.Builder().
- againstUrls(dmaapServers).
- onTopic(topic).
- build();
- this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+ if (protocol == ProtocolTypeConstants.AAF_AUTH) {
+ if (servers == null || servers.isEmpty())
+ throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
+
+ ArrayList<String> dmaapServers = new ArrayList<String>();
+ if(useHttps){
+ for (String server: servers) {
+ dmaapServers.add(server + ":3905");
+ }
+
+ }
+ else{
+ for (String server: servers) {
+ dmaapServers.add(server + ":3904");
+ }
+ }
+
+
+ this.publisher =
+ new MRSimplerBatchPublisher.Builder().
+ againstUrls(dmaapServers).
+ onTopic(topic).
+ build();
+
+ this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+ } else if (protocol == ProtocolTypeConstants.DME2) {
+ ArrayList<String> dmaapServers = new ArrayList<String>();
+ dmaapServers.add("0.0.0.0:3904");
+
+ this.publisher =
+ new MRSimplerBatchPublisher.Builder().
+ againstUrls(dmaapServers).
+ onTopic(topic).
+ build();
+
+ this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+
+ }
+
+ this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
- this.publisher.setUsername(aafLogin);
- this.publisher.setPassword(aafPassword);
+ this.publisher.setUsername(username);
+ this.publisher.setPassword(password);
- Properties props = new Properties();
- props.setProperty("Protocol", "http");
+ props = new Properties();
+
+ if(useHttps){
+
+ props.setProperty("Protocol", "https");
+ }
+ else{
+
+ props.setProperty("Protocol", "http");
+
+ }
+
props.setProperty("contenttype", "application/json");
+ props.setProperty("username", username);
+ props.setProperty("password", password);
+
+ props.setProperty("topic", topic);
this.publisher.setProps(props);
- this.publisher.setHost(servers.get(0));
+ if (protocol == ProtocolTypeConstants.AAF_AUTH)
+ this.publisher.setHost(servers.get(0));
- if (PolicyLogger.isInfoEnabled())
+ if (PolicyLogger.isInfoEnabled()) {
PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
"CREATION: " + this);
+ PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
+ "BusPublisher.DmaapPublisherWrapper using Protocol: " + protocol.getValue());
+ }
+
}
/**
@@ -208,7 +282,16 @@ public interface BusPublisher {
if (message == null)
throw new IllegalArgumentException("No message provided");
+ this.publisher.setPubResponse(new MRPublisherResponse());
this.publisher.send(partitionId, message);
+ MRPublisherResponse response = this.publisher.sendBatchWithResponse();
+ if (PolicyLogger.isDebugEnabled() && response != null) {
+ PolicyLogger.debug(DmaapPublisherWrapper.class.getName(),
+ "DMaaP publisher received " + response.getResponseCode() + ": "
+ + response.getResponseMessage());
+
+ }
+
return true;
}
@@ -227,5 +310,97 @@ public interface BusPublisher {
return builder.toString();
}
}
+
+ /**
+ * DmaapClient library wrapper
+ */
+ public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
+ /**
+ * MR based Publisher
+ */
+ protected MRSimplerBatchPublisher publisher;
+
+ public DmaapAafPublisherWrapper(List<String> servers, String topic,
+ String aafLogin,
+ String aafPassword, boolean useHttps) {
+
+ super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
+ }
+ }
+
+ public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
+ public DmaapDmePublisherWrapper(List<String> servers, String topic,
+ String username, String password,
+ String environment, String aftEnvironment, String dme2Partner,
+ String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) {
+
+ super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
+
+
+
+
+
+
+ String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+
+ if (environment == null || environment.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
+ } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
+ } if (latitude == null || latitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
+ } if (longitude == null || longitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
+ }
+
+ if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
+ throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " +
+ PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ }
+
+ String serviceName = servers.get(0);
+
+ /* These are required, no defaults */
+ props.setProperty("Environment", environment);
+ props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+
+ props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+ if (dme2Partner != null)
+ props.setProperty("Partner", dme2Partner);
+ if (dme2RouteOffer != null)
+ props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+
+ props.setProperty("Latitude", latitude);
+ props.setProperty("Longitude", longitude);
+
+ // ServiceName also a default, found in additionalProps
+
+ /* These are optional, will default to these values if not set in optionalProps */
+ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
+ props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
+ props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
+ props.setProperty("Version", "1.0");
+ props.setProperty("SubContextPath", "/");
+ props.setProperty("sessionstickinessrequired", "no");
+
+ /* These should not change */
+ props.setProperty("TransportType", "DME2");
+ props.setProperty("MethodType", "POST");
+
+ for (String key : additionalProps.keySet()) {
+ String value = additionalProps.get(key);
+
+ if (value != null)
+ props.setProperty(key, value);
+ }
+
+ this.publisher.setProps(props);
+ }
+ }
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
index e36e3afc..4ac1c6fc 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
@@ -23,7 +23,6 @@ package org.openecomp.policy.drools.event.comm.bus.internal;
import java.util.List;
import org.apache.commons.collections4.queue.CircularFifoQueue;
-
import org.openecomp.policy.drools.event.comm.Topic;
import org.openecomp.policy.drools.event.comm.bus.BusTopic;
@@ -35,13 +34,30 @@ public abstract class BusTopicBase implements BusTopic, Topic {
protected String apiKey;
protected String apiSecret;
+ protected boolean useHttps;
+ protected boolean allowSelfSignedCerts;
protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<String>(10);
+ /**
+ * Instantiates a new Bus Topic Base
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
+ *
+ * @return a Bus Topic Base
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
public BusTopicBase(List<String> servers,
String topic,
String apiKey,
- String apiSecret)
+ String apiSecret,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
if (servers == null || servers.isEmpty()) {
@@ -57,6 +73,8 @@ public abstract class BusTopicBase implements BusTopic, Topic {
this.apiKey = apiKey;
this.apiSecret = apiSecret;
+ this.useHttps = useHttps;
+ this.allowSelfSignedCerts = allowSelfSignedCerts;
}
/**
@@ -92,6 +110,20 @@ public abstract class BusTopicBase implements BusTopic, Topic {
}
/**
+ * @return useHttps
+ */
+ public boolean isUseHttps(){
+ return useHttps;
+ }
+
+ /**
+ * @return allowSelfSignedCerts
+ */
+ public boolean isAllowSelfSignedCerts(){
+ return allowSelfSignedCerts;
+ }
+
+ /**
* @return the recentEvents
*/
@Override
@@ -104,8 +136,13 @@ public abstract class BusTopicBase implements BusTopic, Topic {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("UebTopicBase [servers=").append(servers).append(", topic=").append(topic).append(", apiKey=")
- .append(apiKey).append(", apiSecret=").append(apiSecret).append("]");
+ builder.append("UebTopicBase [servers=").append(servers)
+ .append(", topic=").append(topic)
+ .append(", apiKey=").append(apiKey)
+ .append(", apiSecret=").append(apiSecret)
+ .append(", useHttps=").append(useHttps)
+ .append(", allowSelfSignedCerts=").append(allowSelfSignedCerts)
+ .append("]");
return builder.toString();
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
index bd88818b..a78de716 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
@@ -83,13 +83,15 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
* @param apiKey api secret
* @param apiSecret api secret
* @param partitionId partition id
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
* @throws IllegalArgumentException in invalid parameters are passed in
*/
public InlineBusTopicSink(List<String> servers, String topic,
- String apiKey, String apiSecret, String partitionId)
+ String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts)
throws IllegalArgumentException {
- super(servers, topic, apiKey, apiSecret);
+ super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
if (partitionId == null || partitionId.isEmpty()) {
this.partitionId = UUID.randomUUID ().toString();
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
index 417c6d47..f5a3dc11 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
@@ -21,11 +21,12 @@
package org.openecomp.policy.drools.event.comm.bus.internal;
import java.util.List;
+import java.util.Map;
-import org.openecomp.policy.drools.event.comm.Topic;
-import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink;
import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.drools.event.comm.Topic;
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink;
/**
* This implementation publishes events for the associated DMAAP topic,
@@ -39,13 +40,68 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
protected final String userName;
protected final String password;
+ protected String environment = null;
+ protected String aftEnvironment = null;
+ protected String partner = null;
+ protected String latitude = null;
+ protected String longitude = null;
+
+ protected Map<String,String> additionalProps = null;
+
+ /**
+ *
+ * @param servers DMaaP servers
+ * @param topic DMaaP Topic to be monitored
+ * @param apiKey DMaaP API Key (optional)
+ * @param apiSecret DMaaP API Secret (optional)
+ * @param consumerGroup DMaaP Reader Consumer Group
+ * @param consumerInstance DMaaP Reader Instance
+ * @param fetchTimeout DMaaP fetch timeout
+ * @param fetchLimit DMaaP fetch limit
+ * @param environment DME2 Environment
+ * @param aftEnvironment DME2 AFT Environment
+ * @param partner DME2 Partner
+ * @param latitude DME2 Latitude
+ * @param longitude DME2 Longitude
+ * @param additionalProps Additional properties to pass to DME2
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
+ *
+ * @throws IllegalArgumentException An invalid parameter passed in
+ */
+ public InlineDmaapTopicSink(List<String> servers, String topic,
+ String apiKey, String apiSecret,
+ String userName, String password,
+ String partitionKey,
+ String environment, String aftEnvironment, String partner,
+ String latitude, String longitude, Map<String,String> additionalProps,
+ boolean useHttps, boolean allowSelfSignedCerts)
+ throws IllegalArgumentException {
+
+ super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
+
+ this.userName = userName;
+ this.password = password;
+
+ this.environment = environment;
+ this.aftEnvironment = aftEnvironment;
+ this.partner = partner;
+
+ this.latitude = latitude;
+ this.longitude = longitude;
+
+ this.additionalProps = additionalProps;
+
+ this.init();
+ }
+
public InlineDmaapTopicSink(List<String> servers, String topic,
String apiKey, String apiSecret,
String userName, String password,
- String partitionKey)
+ String partitionKey, boolean useHttps, boolean allowSelfSignedCerts)
throws IllegalArgumentException {
- super(servers, topic, apiKey, apiSecret, partitionKey);
+ super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
this.userName = userName;
this.password = password;
@@ -54,11 +110,24 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
@Override
public void init() {
- this.publisher =
- new BusPublisher.DmaapPublisherWrapper(this.servers,
+ if ((this.environment == null || this.environment.isEmpty()) &&
+ (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
+ (this.latitude == null || this.latitude.isEmpty()) &&
+ (this.longitude == null || this.longitude.isEmpty()) &&
+ (this.partner == null || this.partner.isEmpty())) {
+ this.publisher =
+ new BusPublisher.DmaapAafPublisherWrapper(this.servers,
this.topic,
this.userName,
- this.password);
+ this.password, this.useHttps);
+ } else {
+ this.publisher =
+ new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic,
+ this.userName, this.password,
+ this.environment, this.aftEnvironment,
+ this.partner, this.latitude, this.longitude,
+ this.additionalProps, this.useHttps);
+ }
if (logger.isInfoEnabled())
logger.info("DMAAP SINK TOPIC created " + this);
}
@@ -81,4 +150,4 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
return builder.toString();
}
-}
+} \ No newline at end of file
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
index 2d4b1552..c93e0f2b 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
@@ -46,6 +46,8 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
* @param apiKey the api key (optional)
* @param apiSecret the api secret (optional)
* @param partitionId the partition key (optional, autogenerated if not provided)
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
*
* @throws IllegalArgumentException if invalid arguments are detected
*/
@@ -53,9 +55,11 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
String topic,
String apiKey,
String apiSecret,
- String partitionId)
+ String partitionId,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
- super(servers, topic, apiKey, apiSecret, partitionId);
+ super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts);
}
/**
@@ -68,7 +72,8 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
new BusPublisher.CambriaPublisherWrapper(this.servers,
this.topic,
this.apiKey,
- this.apiSecret);
+ this.apiSecret,
+ this.useHttps);
if (logger.isInfoEnabled())
logger.info("UEB SINK TOPIC created " + this);
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index f37c349e..d3be9163 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -98,6 +98,7 @@ public abstract class SingleThreadedBusTopicSource
*/
protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>();
+
/**
*
* @param servers Bus servers
@@ -108,6 +109,8 @@ public abstract class SingleThreadedBusTopicSource
* @param consumerInstance Bus Reader Instance
* @param fetchTimeout Bus fetch timeout
* @param fetchLimit Bus fetch limit
+ * @param useHttps does the bus use https
+ * @param allowSelfSignedCerts are self-signed certificates allowed
* @throws IllegalArgumentException An invalid parameter passed in
*/
public SingleThreadedBusTopicSource(List<String> servers,
@@ -117,10 +120,12 @@ public abstract class SingleThreadedBusTopicSource
String consumerGroup,
String consumerInstance,
int fetchTimeout,
- int fetchLimit)
+ int fetchLimit,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
- super(servers, topic, apiKey, apiSecret);
+ super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
if (consumerGroup == null || consumerGroup.isEmpty()) {
this.consumerGroup = UUID.randomUUID ().toString();
@@ -145,6 +150,7 @@ public abstract class SingleThreadedBusTopicSource
} else {
this.fetchLimit = fetchLimit;
}
+
}
/**
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
index e65d44a7..2ced5bcb 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
@@ -21,10 +21,11 @@
package org.openecomp.policy.drools.event.comm.bus.internal;
import java.util.List;
+import java.util.Map;
+import org.openecomp.policy.common.logging.eelf.PolicyLogger;
import org.openecomp.policy.drools.event.comm.Topic;
import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource;
-import org.openecomp.policy.common.logging.eelf.PolicyLogger;
/**
* This topic reader implementation specializes in reading messages
@@ -33,10 +34,21 @@ import org.openecomp.policy.common.logging.eelf.PolicyLogger;
public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
implements DmaapTopicSource, Runnable {
+
+ protected boolean allowSelfSignedCerts;
protected final String userName;
protected final String password;
private String className = SingleThreadedDmaapTopicSource.class.getName();
+
+ protected String environment = null;
+ protected String aftEnvironment = null;
+ protected String partner = null;
+ protected String latitude = null;
+ protected String longitude = null;
+
+ protected Map<String,String> additionalProps = null;
+
/**
*
* @param servers DMaaP servers
@@ -47,19 +59,75 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
* @param consumerInstance DMaaP Reader Instance
* @param fetchTimeout DMaaP fetch timeout
* @param fetchLimit DMaaP fetch limit
+ * @param environment DME2 Environment
+ * @param aftEnvironment DME2 AFT Environment
+ * @param partner DME2 Partner
+ * @param latitude DME2 Latitude
+ * @param longitude DME2 Longitude
+ * @param additionalProps Additional properties to pass to DME2
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
+ *
+ * @throws IllegalArgumentException An invalid parameter passed in
+ */
+ public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
+ String apiKey, String apiSecret,
+ String userName, String password,
+ String consumerGroup, String consumerInstance,
+ int fetchTimeout, int fetchLimit,
+ String environment, String aftEnvironment, String partner,
+ String latitude, String longitude, Map<String,String> additionalProps,
+ boolean useHttps, boolean allowSelfSignedCerts)
+ throws IllegalArgumentException {
+
+ super(servers, topic, apiKey, apiSecret,
+ consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
+
+ this.userName = userName;
+ this.password = password;
+
+ this.environment = environment;
+ this.aftEnvironment = aftEnvironment;
+ this.partner = partner;
+
+ this.latitude = latitude;
+ this.longitude = longitude;
+
+ this.additionalProps = additionalProps;
+ try {
+ this.init();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ *
+ * @param servers DMaaP servers
+ * @param topic DMaaP Topic to be monitored
+ * @param apiKey DMaaP API Key (optional)
+ * @param apiSecret DMaaP API Secret (optional)
+ * @param consumerGroup DMaaP Reader Consumer Group
+ * @param consumerInstance DMaaP Reader Instance
+ * @param fetchTimeout DMaaP fetch timeout
+ * @param fetchLimit DMaaP fetch limit
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
* @throws IllegalArgumentException An invalid parameter passed in
*/
public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
String apiKey, String apiSecret,
String userName, String password,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit)
+ int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
throws IllegalArgumentException {
super(servers, topic, apiKey, apiSecret,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
this.userName = userName;
this.password = password;
@@ -78,22 +146,35 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
*/
@Override
public void init() throws Exception {
-
if (this.userName == null || this.userName.isEmpty() ||
- this.password == null || this.password.isEmpty()) {
- this.consumer =
- new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
- this.apiKey, this.apiSecret,
- this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit);
- } else {
- this.consumer =
- new BusConsumer.DmaapConsumerWrapper(this.servers, this.topic,
- this.apiKey, this.apiSecret,
- this.userName, this.password,
- this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit);
- }
+ this.password == null || this.password.isEmpty()) {
+ this.consumer =
+ new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
+ this.apiKey, this.apiSecret,
+ this.consumerGroup, this.consumerInstance,
+ this.fetchTimeout, this.fetchLimit,
+ this.useHttps, this.allowSelfSignedCerts);
+ } else if ((this.environment == null || this.environment.isEmpty()) &&
+ (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
+ (this.latitude == null || this.latitude.isEmpty()) &&
+ (this.longitude == null || this.longitude.isEmpty()) &&
+ (this.partner == null || this.partner.isEmpty())) {
+ this.consumer =
+ new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic,
+ this.apiKey, this.apiSecret,
+ this.userName, this.password,
+ this.consumerGroup, this.consumerInstance,
+ this.fetchTimeout, this.fetchLimit, this.useHttps);
+ } else {
+ this.consumer =
+ new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic,
+ this.apiKey, this.apiSecret,
+ this.userName, this.password,
+ this.consumerGroup, this.consumerInstance,
+ this.fetchTimeout, this.fetchLimit,
+ this.environment, this.aftEnvironment, this.partner,
+ this.latitude, this.longitude, this.additionalProps, this.useHttps);
+ }
PolicyLogger.info(className, "CREATION: " + this);
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
index edb55c75..641eddaa 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
@@ -42,18 +42,24 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource
* @param consumerInstance UEB Reader Instance
* @param fetchTimeout UEB fetch timeout
* @param fetchLimit UEB fetch limit
+ * @param useHttps does topicSource use HTTPS?
+ * @param allowSelfSignedCerts does topicSource allow self-signed certs?
+ *
* @throws IllegalArgumentException An invalid parameter passed in
*/
+
+
public SingleThreadedUebTopicSource(List<String> servers, String topic,
String apiKey, String apiSecret,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit)
+ int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
throws IllegalArgumentException {
super(servers, topic, apiKey, apiSecret,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
+ this.allowSelfSignedCerts = allowSelfSignedCerts;
this.init();
}
@@ -67,7 +73,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource
new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
this.apiKey, this.apiSecret,
this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit);
+ this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
}
/**
@@ -77,6 +83,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource
public CommInfrastructure getTopicCommInfrastructure() {
return Topic.CommInfrastructure.UEB;
}
+
@Override
public String toString() {