summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main
diff options
context:
space:
mode:
authorRalph Straubs <rs8887@att.com>2017-04-21 04:23:55 -0500
committerRalph Straubs <rs8887@att.com>2017-04-21 05:55:16 -0500
commit5bd9ff4130799247be3be88fc02f3ce0efd014c0 (patch)
tree327ed17fae044265b08c419913290e2f9bcddcec /policy-endpoints/src/main
parent8f7e5de17521836a96fa1c771a1651f63216675c (diff)
Batch submit -- force rebuild
[ECOMPD2TD-1126 1707] Delay the start of the election handler by one more update interval to allow updatedDate to be set and recoginized by other nodes. [ECOMPD2TD-1073 1707] Remove pessimistic locks from DB access to prevent DB deadlock [US867397-policy-management-feature-api] Split original FeatureAPI into three service interfaces. These are: - the original FeatureAPI for core - PolicyControllerFeatureAPI (additional interface methods yet to come ..) - PolicyEngineFeatureAPI (additional interface methods yet to come ..) Additional extensions will be done in the future. This is to have equivalent functionality with what it is already there. [US867397-policy-management-feature-api] Split original FeatureAPI into three service interfaces. These are: - the original FeatureAPI for core - PolicyControllerFeatureAPI (additional interface methods yet to come ..) - PolicyEngineFeatureAPI (additional interface methods yet to come ..) Additional extensions will be done in the future. This is to have equivalent functionality with what it is already there. [ECOMPD2TD-1167] Avoid redundant dependencies and remove obsolete sql scripts [ECOMPD2TD-848] Fix copied from ATT droolspdp repo -- add timestamp to metric and audit logs [ECOMPD2TD-1167] update jetty version to 9.3.14 [ECOMPD2TD-1167] pull in policy-core jar as a dependency of policy-management, instead of zip file [ECOMPD2TD-1159] Add property to ignore repository audit errors The property is called 'repository.audit.ignore.errors' -- it resides in 'IntegrityMonitor.properties', and it defaults to 'true'. When true, any errors that occur in the repository audit are logged, but not reported to integrity monitor. [US867397] additional hooks for policy engine, and support operation take over by feature bypassing further feature processing [US867397] additional hooks for policy engine, and support operation take over by feature bypassing further feature processing [US867397] fix reverse return javadoc comment [ECOMPD2TD-1192] Select correct persistent session information This is a modification to 'DroolsPdpsElectionHandler.DesignatedWaiter'. When going active, the list of persistent sessions needs to come from the most recent active DroolsPDP. The most recent active DroolsPDP is tracked in a local variable 'mostRecentPrimary', and the associated list of sessions needs to be extracted just prior to going active. The problem was that the list of sessions was extracted after the current DroolsPDP was set to 'mostRecentPrimary'. [ECOMPD2TD-1073 1707] Remove pessimistic locks from DB access to prevent DB deadlock [US860546] Add workaround for missing logger in message router [US860546] add DME2 properties to PolicyProperties [US860546] add DME2 support, pending unit testing [US860546] updated unit tests for dmaap/DME2 [US860546] removed System.out.println lines from BusConsumer and BusPublisher [US860546] modifications and additions to TopicEndpoint tests [US860546] remove block to append /events to serviceName in BusConsumer, BusPublisher [US860546] removing dmaap unit tests with ATT references [ECOMPD2TD-1167] Audit release repository by default [ECOMPD2TD-000] Cherry-pick versioning fixes from 1707.39.1 release branch [ECOMPD2TD-000] Fix policy-endpoints cambriaClient version [US865296] ECOMP Policy Logging Compliance, remove redundant timestamp from metric and audit log for logging compliance [US865296] ECOMP Policy Logging Compliance, remove TargetVirtualEntity from audit log, provide name to un-used column [US865296] ECOMP Policy Logging Compliance, remove Unused from AuditLogPattern, leave the column blank means un-used. [US000000] Cleanup and performance improvements 1) Changed 'OrderedServiceImpl' to ensure that the same service instance is used in every queue. This gets around the 'ServiceLoader' behavior that generates a new instance for each interface the service implements. 2) In 'PolicySession': add adaptive poll times within Drools sessions. Instead of always polling every 5 seconds, poll frequency increases when rules fire, and decreases when they don't. At present, the poll delay ranges from 100 milliseconds to 5 seconds, but at some point, it would be nice to make this configurable. 3) In 'PolicySession': add tests for 'logger.isDebugEnabled()' in session listener methods -- this results in a significant amount of real-time savings under load. [ECOMPD2TD-1126 1707] Changes to the election handler to deal with situations (race conditions) which can affect the site choice [ECOMPD2TD-1126 1707] Clean up of site afinity logic. Minor changes and a fix of the case where all pdpd are designated or all are hot standby. [ECOMPD2TD-1126 1707] Fixed issue with election handler and got all StandbyStateManagmentTest JUnits working. [ECOMPD2TD-1126 1707] Hopefully final changes to DroolsPdpsElectionHandler and StateManagementTest. Also removed the ResiliencyTestCases since they were redundant. [ECOMPD2TD-1126 1707] Found a minor bug in the computeMostRecentPrimary algorithm which I corrected and added a JUnit to confirm. [ECOMPD2TD-1126 1707] Removed code that was commented out of election handler to improve reability. Added an explanatory note to StandbyStateManagementTest. [US869183] Convert integrity monitor test endpoint to use HttpServletServer [US869183] add swagger-maven-plugin, update swagger annotation for IM test interface [US860371] Modified existing Dmaap and Ueb code to allow for https connectivity. HTTP/HTTPS is configurable via controller.properties file [US860520] Removed extra useHttps() method call from BusConsumer class where the consumer should be using http. Also, restored pom.xml to version at time feature branch was created. [US860520] Removed TopicEndpointTest.java. Added the ability to configure use of self-signed certificates for consumer topics. Functionality cannot be added for publisher at this time. Removed useHttps parameter field from multiple .builder methods to allow for greater ability to build source/sink objects without specifying http vs https connectivity. Other code cleanup and comments [US860520] -Deleted sys.out statements used in testing from BusConsumer/BusPublisher -Moved setProps method outside of if/else stub in BusConsumer -Could not add useHttps or selfSignedCerts to toString methods in BusConsumer -Move setProperty(contenttype, app/json) call outside of if/else branch in BusPublisher -Renamed doesAllowSelfSignedCerts method to is isAllowSelfSignedCerts and moved to BusTopicBase -Moved allowSelfSignedCerts member variable to BusTopicBase class. BusTopicBase is ancestor of SingleThreadedDmapp*/SingleThreadedUeb*/SingleThreadedBus* classes therefore, it will be inherited. -This changed resulted in cascading changes to the subclasses of BusTopicbase -Default is now set to disallow self-signed certs -Added several javadoc param annotations [US869183 1707] Changes to satisfy dependencies and correct pom syntax [ECOMPD2TD-1280] move DroolsPDPIntegrityMonitor.RestManager to its own class [ECOMPD2TD-1278] Resolve duplicate dependency issues (cherry-pick from 1707.40.1) [ECOMPD2TD-1278] revert cambriaClient back to 0.0.1 [ECOMPD2TD-1278] put dmaap version back to 0.2.12 [ECOMPD2TD-1280 1707] Fix Commons-lang3 version and common-modules version Conflicts: policy-endpoints/pom.xml policy-healthcheck/pom.xml policy-persistence/pom.xml pom.xml Change-Id: I803554bc64c55d2e82a3d6ad9120757f287144fb Signed-off-by: Ralph Straubs <rs8887@att.com>
Diffstat (limited to 'policy-endpoints/src/main')
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java7
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java205
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java235
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java38
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java36
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java268
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java229
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java45
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java6
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java85
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java11
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java10
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java117
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java13
14 files changed, 1177 insertions, 128 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java
index 7a2e9711..a3c2230d 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java
@@ -31,12 +31,7 @@ public interface TopicListener {
* @param commType communication infrastructure type
* @param topic topic name
* @param event event message as a string
- *
- * @return boolean. True if the invoking event dispatcher should continue
- * dispatching the event to subsequent listeners. False if it is requested
- * to the invoking event dispatcher to stop dispatching the same event to
- * other listeners of less priority. This mechanism is generally not used.
*/
- public boolean onTopicEvent(Topic.CommInfrastructure commType, String topic, String event);
+ public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event);
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
index 5b4cfd42..c3d02d14 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
@@ -24,17 +24,64 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
-import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
import org.openecomp.policy.drools.properties.PolicyProperties;
/**
* DMAAP Topic Sink Factory
*/
public interface DmaapTopicSinkFactory {
+ public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
+ public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
+ public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+ public final String DME2_VERSION_PROPERTY = "Version";
+ public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
+ public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
+ public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
+ public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
+
+ /**
+ * Instantiates a new DMAAP Topic Sink
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param userName AAF user name
+ * @param password AAF password
+ * @param partitionKey Consumer Group
+ * @param environment DME2 environment
+ * @param aftEnvironment DME2 AFT environment
+ * @param partner DME2 Partner
+ * @param latitude DME2 latitude
+ * @param longitude DME2 longitude
+ * @param additionalProps additional properties to pass to DME2
+ * @param managed is this sink endpoint managed?
+ *
+ * @return an DMAAP Topic Sink
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public DmaapTopicSink build(List<String> servers,
+ String topic,
+ String apiKey,
+ String apiSecret,
+ String userName,
+ String password,
+ String partitionKey,
+ String environment,
+ String aftEnvironment,
+ String partner,
+ String latitude,
+ String longitude,
+ Map<String,String> additionalProps,
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts) ;
/**
* Instantiates a new DMAAP Topic Sink
@@ -58,7 +105,9 @@ public interface DmaapTopicSinkFactory {
String userName,
String password,
String partitionKey,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException;
/**
@@ -142,7 +191,15 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
String userName,
String password,
String partitionKey,
- boolean managed)
+ String environment,
+ String aftEnvironment,
+ String partner,
+ String latitude,
+ String longitude,
+ Map<String,String> additionalProps,
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
if (topic == null || topic.isEmpty()) {
@@ -158,7 +215,45 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
new InlineDmaapTopicSink(servers, topic,
apiKey, apiSecret,
userName, password,
- partitionKey);
+ partitionKey,
+ environment, aftEnvironment,
+ partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
+
+ if (managed)
+ dmaapTopicWriters.put(topic, dmaapTopicSink);
+ return dmaapTopicSink;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public DmaapTopicSink build(List<String> servers,
+ String topic,
+ String apiKey,
+ String apiSecret,
+ String userName,
+ String password,
+ String partitionKey,
+ boolean managed,
+ boolean useHttps, boolean allowSelfSignedCerts)
+ throws IllegalArgumentException {
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ synchronized (this) {
+ if (dmaapTopicWriters.containsKey(topic)) {
+ return dmaapTopicWriters.get(topic);
+ }
+
+ DmaapTopicSink dmaapTopicSink =
+ new InlineDmaapTopicSink(servers, topic,
+ apiKey, apiSecret,
+ userName, password,
+ partitionKey, useHttps, allowSelfSignedCerts);
if (managed)
dmaapTopicWriters.put(topic, dmaapTopicSink);
@@ -172,7 +267,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
*/
@Override
public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
- return this.build(servers, topic, null, null, null, null, null, true);
+ return this.build(servers, topic, null, null, null, null, null, true, false, false);
}
@@ -196,12 +291,10 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." +
topic +
PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (servers == null || servers.isEmpty()) {
- logger.error("No DMAAP servers provided in " + properties);
- continue;
- }
- List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+ List<String> serverList;
+ if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+ else serverList = new ArrayList<>();
String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
"." + topic +
@@ -223,14 +316,100 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+
+ /* DME2 Properties */
+
+ String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+
+ String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+
+ String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
+
+ String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
+
+ String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+
+ String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+
+ String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
+
+ String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
+
+ String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS
+ + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
+
+ String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
+
+ String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
+
+ String dme2SessionStickinessRequired = properties
+ .getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
+
+ Map<String,String> dme2AdditionalProps = new HashMap<>();
+
+ if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
+ dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
+ if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
+ dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
+ if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
+ dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
+ if (dme2Version != null && !dme2Version.isEmpty())
+ dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
+ if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
+ dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+ if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
+ dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
+ if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
+ dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
+
+ if (servers == null || servers.isEmpty()) {
+ logger.error("No DMaaP servers or DME2 ServiceName provided");
+ continue;
+ }
+
boolean managed = true;
if (managedString != null && !managedString.isEmpty()) {
managed = Boolean.parseBoolean(managedString);
}
+ String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+ //default is to use HTTP if no https property exists
+ boolean useHttps = false;
+ if (useHttpsString != null && !useHttpsString.isEmpty()){
+ useHttps = Boolean.parseBoolean(useHttpsString);
+ }
+
+
+ String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+ //default is to disallow self-signed certs
+ boolean allowSelfSignedCerts = false;
+ if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
+ allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+ }
+
DmaapTopicSink dmaapTopicSink = this.build(serverList, topic,
- apiKey, apiSecret, aafMechId, aafPassword,
- partitionKey, managed);
+ apiKey, apiSecret,
+ aafMechId, aafPassword,
+ partitionKey,
+ dme2Environment, dme2AftEnvironment,
+ dme2Partner, dme2Latitude, dme2Longitude,
+ dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
+
dmaapTopicWriters.add(dmaapTopicSink);
}
return dmaapTopicWriters;
@@ -305,4 +484,4 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
return writers;
}
-}
+} \ No newline at end of file
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
index f8d85eb7..9f60556c 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
@@ -24,16 +24,26 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
import org.openecomp.policy.common.logging.flexlogger.Logger;
import org.openecomp.policy.drools.properties.PolicyProperties;
+
/**
* DMAAP Topic Source Factory
*/
public interface DmaapTopicSourceFactory {
+ public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
+ public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
+ public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+ public final String DME2_VERSION_PROPERTY = "Version";
+ public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
+ public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
+ public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
+ public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
/**
* Creates an DMAAP Topic Source based on properties files
@@ -60,6 +70,8 @@ public interface DmaapTopicSourceFactory {
* @param fetchTimeout Read Fetch Timeout
* @param fetchLimit Fetch Limit
* @param managed is this endpoind managed?
+ * @param useHttps does the connection use HTTPS?
+ * @param allowSelfSignedCerts does connection allow self-signed certificates?
*
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
@@ -74,7 +86,56 @@ public interface DmaapTopicSourceFactory {
String consumerInstance,
int fetchTimeout,
int fetchLimit,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
+ throws IllegalArgumentException;
+
+ /**
+ * Instantiates a new DMAAP Topic Source
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param userName user name
+ * @param password password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Read Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @param environment DME2 environment
+ * @param aftEnvironment DME2 AFT environment
+ * @param partner DME2 Partner
+ * @param latitude DME2 latitude
+ * @param longitude DME2 longitude
+ * @param additionalProps additional properties to pass to DME2
+ * @param managed is this endpoind managed?
+ * @param useHttps does the connection use HTTPS?
+ * @param allowSelfSignedCerts does connection allow self-signed certificates?
+ *
+ * @return an DMAAP Topic Source
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public DmaapTopicSource build(List<String> servers,
+ String topic,
+ String apiKey,
+ String apiSecret,
+ String userName,
+ String password,
+ String consumerGroup,
+ String consumerInstance,
+ int fetchTimeout,
+ int fetchLimit,
+ String environment,
+ String aftEnvironment,
+ String partner,
+ String latitude,
+ String longitude,
+ Map<String,String> additionalProps,
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException;
/**
@@ -104,7 +165,7 @@ public interface DmaapTopicSourceFactory {
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSource build(List<String> servers,
+ public DmaapTopicSource build(List<String> servers,
String topic)
throws IllegalArgumentException;
@@ -154,7 +215,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
*/
protected HashMap<String, DmaapTopicSource> dmaapTopicSources =
new HashMap<String, DmaapTopicSource>();
-
+
/**
* {@inheritDoc}
*/
@@ -169,7 +230,15 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
String consumerInstance,
int fetchTimeout,
int fetchLimit,
- boolean managed)
+ String environment,
+ String aftEnvironment,
+ String partner,
+ String latitude,
+ String longitude,
+ Map<String,String> additionalProps,
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
if (topic == null || topic.isEmpty()) {
@@ -185,7 +254,53 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
new SingleThreadedDmaapTopicSource(servers, topic,
apiKey, apiSecret, userName, password,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ fetchTimeout, fetchLimit,
+ environment, aftEnvironment, partner,
+ latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
+
+ if (managed)
+ dmaapTopicSources.put(topic, dmaapTopicSource);
+
+ return dmaapTopicSource;
+ }
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public DmaapTopicSource build(List<String> servers,
+ String topic,
+ String apiKey,
+ String apiSecret,
+ String userName,
+ String password,
+ String consumerGroup,
+ String consumerInstance,
+ int fetchTimeout,
+ int fetchLimit,
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
+ throws IllegalArgumentException {
+
+ if (servers == null || servers.isEmpty()) {
+ throw new IllegalArgumentException("DMaaP Server(s) must be provided");
+ }
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ synchronized(this) {
+ if (dmaapTopicSources.containsKey(topic)) {
+ return dmaapTopicSources.get(topic);
+ }
+
+ DmaapTopicSource dmaapTopicSource =
+ new SingleThreadedDmaapTopicSource(servers, topic,
+ apiKey, apiSecret, userName, password,
+ consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
if (managed)
dmaapTopicSources.put(topic, dmaapTopicSource);
@@ -216,12 +331,9 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
topic +
PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (servers == null || servers.isEmpty()) {
- logger.error("No UEB servers provided in " + properties);
- continue;
- }
-
- List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+ List<String> serverList;
+ if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+ else serverList = new ArrayList<>();
String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
"." + topic +
@@ -250,6 +362,70 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
"." + topic +
PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
+
+ /* DME2 Properties */
+
+ String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+
+ String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+
+ String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
+
+ String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
+
+ String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+
+ String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+
+ String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
+
+ String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
+
+ String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+ + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
+
+ String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
+
+ String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
+
+ String dme2SessionStickinessRequired = properties
+ .getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
+
+ Map<String,String> dme2AdditionalProps = new HashMap<>();
+
+ if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
+ dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
+ if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
+ dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
+ if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
+ dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
+ if (dme2Version != null && !dme2Version.isEmpty())
+ dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
+ if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
+ dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+ if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
+ dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
+ if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
+ dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
+
+
+ if (servers == null || servers.isEmpty()) {
+
+ logger.error("No DMaaP servers or DME2 ServiceName provided");
+ continue;
+ }
+
int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
try {
@@ -279,10 +455,33 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
managed = Boolean.parseBoolean(managedString);
}
+ String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+ //default is to use HTTP if no https property exists
+ boolean useHttps = false;
+ if (useHttpsString != null && !useHttpsString.isEmpty()){
+ useHttps = Boolean.parseBoolean(useHttpsString);
+ }
+
+ String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+ //default is to disallow self-signed certs
+ boolean allowSelfSignedCerts = false;
+ if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
+ allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+ }
+
+
DmaapTopicSource uebTopicSource = this.build(serverList, topic,
apiKey, apiSecret, aafMechId, aafPassword,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, managed);
+ fetchTimeout, fetchLimit,
+ dme2Environment, dme2AftEnvironment, dme2Partner,
+ dme2Latitude, dme2Longitude, dme2AdditionalProps,
+ managed, useHttps, allowSelfSignedCerts);
+
dmaapTopicSource_s.add(uebTopicSource);
}
}
@@ -291,25 +490,29 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
/**
* {@inheritDoc}
+ * @throws IllegalArgumentException
*/
@Override
public DmaapTopicSource build(List<String> servers,
String topic,
String apiKey,
- String apiSecret) {
+ String apiSecret) throws IllegalArgumentException {
return this.build(servers, topic,
apiKey, apiSecret, null, null,
null, null,
DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
DmaapTopicSource.DEFAULT_LIMIT_FETCH,
- true);
+ true,
+ false,
+ false);
}
/**
* {@inheritDoc}
+ * @throws IllegalArgumentException
*/
@Override
- public DmaapTopicSource build(List<String> servers, String topic) {
+ public DmaapTopicSource build(List<String> servers, String topic) throws IllegalArgumentException {
return this.build(servers, topic, null, null);
}
@@ -352,7 +555,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
if (dmaapTopicSources.containsKey(topic)) {
return dmaapTopicSources.get(topic);
} else {
- throw new IllegalArgumentException("DmaapTopicSource for " + topic + " not found");
+ throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
}
}
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
index 85b98838..432f035c 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
@@ -54,7 +54,9 @@ public interface UebTopicSinkFactory {
String apiKey,
String apiSecret,
String partitionKey,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException;
/**
@@ -135,9 +137,15 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
String apiKey,
String apiSecret,
String partitionKey,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
+ if (servers == null || servers.isEmpty()) {
+ throw new IllegalArgumentException("UEB Server(s) must be provided");
+ }
+
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException("A topic must be provided");
}
@@ -149,7 +157,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
UebTopicSink uebTopicWriter =
new InlineUebTopicSink(servers, topic,
- apiKey, apiSecret,partitionKey);
+ apiKey, apiSecret,partitionKey, useHttps, allowSelfSignedCerts);
if (managed)
uebTopicSinks.put(topic, uebTopicWriter);
@@ -164,7 +172,8 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
*/
@Override
public UebTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
- return this.build(servers, topic, null, null, null, true);
+
+ return this.build(servers, topic, null, null, null, true, false, false);
}
@@ -212,9 +221,28 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
managed = Boolean.parseBoolean(managedString);
}
+ String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+ //default is to use HTTP if no https property exists
+ boolean useHttps = false;
+ if (useHttpsString != null && !useHttpsString.isEmpty()){
+ useHttps = Boolean.parseBoolean(useHttpsString);
+ }
+
+
+ String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+ //default is to disallow self-signed certs
+ boolean allowSelfSignedCerts = false;
+ if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
+ allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+ }
+
UebTopicSink uebTopicWriter = this.build(serverList, topic,
apiKey, apiSecret,
- partitionKey, managed);
+ partitionKey, managed, useHttps, allowSelfSignedCerts);
uebTopicWriters.add(uebTopicWriter);
}
return uebTopicWriters;
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java
index bf2a4038..1729576f 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java
@@ -71,7 +71,9 @@ public interface UebTopicSourceFactory {
String consumerInstance,
int fetchTimeout,
int fetchLimit,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException;
/**
@@ -162,8 +164,13 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
String consumerInstance,
int fetchTimeout,
int fetchLimit,
- boolean managed)
+ boolean managed,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
+ if (servers == null || servers.isEmpty()) {
+ throw new IllegalArgumentException("UEB Server(s) must be provided");
+ }
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException("A topic must be provided");
@@ -178,7 +185,7 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
new SingleThreadedUebTopicSource(servers, topic,
apiKey, apiSecret,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
if (managed)
uebTopicSources.put(topic, uebTopicSource);
@@ -263,10 +270,28 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
managed = Boolean.parseBoolean(managedString);
}
+ String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+
+ //default is to use HTTP if no https property exists
+ boolean useHttps = false;
+ if (useHttpsString != null && !useHttpsString.isEmpty()){
+ useHttps = Boolean.parseBoolean(useHttpsString);
+ }
+
+ String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
+ PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+
+ //default is to disallow self-signed certs
+ boolean allowSelfSignedCerts = false;
+ if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
+ allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
+ }
+
UebTopicSource uebTopicSource = this.build(serverList, topic,
apiKey, apiSecret,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, managed);
+ fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
uebTopicSources.add(uebTopicSource);
}
}
@@ -281,11 +306,12 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
String topic,
String apiKey,
String apiSecret) {
+
return this.build(servers, topic,
apiKey, apiSecret,
null, null,
UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
- UebTopicSource.DEFAULT_LIMIT_FETCH, true);
+ UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true);
}
/**
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java
index 6fee5ce0..a34d361b 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -22,12 +22,20 @@ package org.openecomp.policy.drools.event.comm.bus.internal;
import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import org.openecomp.policy.common.logging.eelf.PolicyLogger;
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+
import com.att.nsa.cambria.client.CambriaClientBuilders;
import com.att.nsa.cambria.client.CambriaConsumer;
+import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import com.att.nsa.mr.client.response.MRConsumerResponse;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
@@ -76,17 +84,40 @@ public interface BusConsumer {
public CambriaConsumerWrapper(List<String> servers, String topic,
String apiKey, String apiSecret,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit)
+ int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts)
throws IllegalArgumentException {
ConsumerBuilder builder =
new CambriaClientBuilders.ConsumerBuilder();
- builder.knownAs(consumerGroup, consumerInstance)
+
+ if (useHttps){
+
+ if(useSelfSignedCerts){
+ builder.knownAs(consumerGroup, consumerInstance)
+ .usingHosts(servers)
+ .onTopic(topic)
+ .waitAtServer(fetchTimeout)
+ .receivingAtMost(fetchLimit)
+ .usingHttps()
+ .allowSelfSignedCertificates();
+ }
+ else{
+ builder.knownAs(consumerGroup, consumerInstance)
+ .usingHosts(servers)
+ .onTopic(topic)
+ .waitAtServer(fetchTimeout)
+ .receivingAtMost(fetchLimit)
+ .usingHttps();
+ }
+ }
+ else{
+ builder.knownAs(consumerGroup, consumerInstance)
.usingHosts(servers)
.onTopic(topic)
.waitAtServer(fetchTimeout)
.receivingAtMost(fetchLimit);
+ }
if (apiKey != null && !apiKey.isEmpty() &&
apiSecret != null && !apiSecret.isEmpty()) {
@@ -123,8 +154,11 @@ public interface BusConsumer {
/**
* MR based consumer
*/
- public static class DmaapConsumerWrapper implements BusConsumer {
+ public abstract class DmaapConsumerWrapper implements BusConsumer {
+ protected int fetchTimeout;
+ protected Object closeCondition = new Object();
+
/**
* MR Consumer
*/
@@ -137,47 +171,75 @@ public interface BusConsumer {
* @param topic topic
* @param apiKey API Key
* @param apiSecret API Secret
- * @param aafLogin AAF Login
- * @param aafPassword AAF Password
+ * @param username AAF Login
+ * @param password AAF Password
* @param consumerGroup Consumer Group
* @param consumerInstance Consumer Instance
* @param fetchTimeout Fetch Timeout
* @param fetchLimit Fetch Limit
+ * @throws MalformedURLException
*/
+ @SuppressWarnings("unchecked")
public DmaapConsumerWrapper(List<String> servers, String topic,
String apiKey, String apiSecret,
- String aafLogin, String aafPassword,
+ String username, String password,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit)
- throws Exception {
+ int fetchTimeout, int fetchLimit, boolean useHttps)
+
+ throws MalformedURLException {
+
+ this.fetchTimeout = fetchTimeout;
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
+ }
this.consumer = new MRConsumerImpl(servers, topic,
consumerGroup, consumerInstance,
fetchTimeout, fetchLimit,
null, apiKey, apiSecret);
- this.consumer.setUsername(aafLogin);
- this.consumer.setPassword(aafPassword);
+ this.consumer.setUsername(username);
+ this.consumer.setPassword(password);
+
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
- Properties props = new Properties();
- props.setProperty("Protocol", "http");
- this.consumer.setProps(props);
- this.consumer.setHost(servers.get(0) + ":3904");;
}
/**
* {@inheritDoc}
*/
public Iterable<String> fetch() throws Exception {
- return this.consumer.fetch();
+ MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
+
+ if (PolicyLogger.isDebugEnabled() && response != null)
+ PolicyLogger.debug(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage());
+
+ if (response.getResponseCode() == null || !response.getResponseCode().equals("200")) {
+ if (response.getResponseCode() == null)
+ PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received response code null");
+ else
+ PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage());
+
+ synchronized (closeCondition) {
+ closeCondition.wait(fetchTimeout);
+ }
+ }
+
+ if (response.getActualMessages() == null)
+ return new ArrayList<String>();
+ else
+ return response.getActualMessages();
}
/**
* {@inheritDoc}
*/
public void close() {
+ synchronized (closeCondition) {
+ closeCondition.notifyAll();
+ }
+
this.consumer.close();
}
@@ -196,7 +258,181 @@ public interface BusConsumer {
}
}
+ /**
+ * MR based consumer
+ */
+ public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
+ private Properties props;
+
+ /**
+ * MR Consumer Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param aafLogin AAF Login
+ * @param aafPassword AAF Password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @throws MalformedURLException
+ */
+ public DmaapAafConsumerWrapper(List<String> servers, String topic,
+ String apiKey, String apiSecret,
+ String aafLogin, String aafPassword,
+ String consumerGroup, String consumerInstance,
+ int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException {
+
+ super(servers, topic, apiKey, apiSecret,
+ aafLogin, aafPassword,
+ consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit, useHttps);
+
+ // super constructor sets servers = {""} if empty to avoid errors when using DME2
+ if ((servers.size() == 1 && servers.get(0).equals("")) ||
+ (servers == null) || (servers.size() == 0)) {
+ throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
+ }
+
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+
+ props = new Properties();
+
+ if(useHttps){
+ props.setProperty("Protocol", "https");
+ this.consumer.setHost(servers.get(0) + ":3905");
+
+ }
+ else{
+ props.setProperty("Protocol", "http");
+ this.consumer.setHost(servers.get(0) + ":3904");
+ }
+
+ this.consumer.setProps(props);
+ PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ MRConsumerImpl consumer = (MRConsumerImpl) this.consumer;
+
+ builder.
+ append("DmaapConsumerWrapper [").
+ append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
+ append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
+ append(", consumer.getHost()=").append(consumer.getHost()).
+ append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
+ append(", consumer.getUsername()=").append(consumer.getUsername()).
+ append("]");
+ return builder.toString();
+ }
+ }
+ public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
+ private Properties props;
+
+ public DmaapDmeConsumerWrapper(List<String> servers, String topic,
+ String apiKey, String apiSecret,
+ String dme2Login, String dme2Password,
+ String consumerGroup, String consumerInstance,
+ int fetchTimeout, int fetchLimit,
+ String environment, String aftEnvironment, String dme2Partner,
+ String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) throws MalformedURLException {
+
+
+
+ super(servers, topic, apiKey, apiSecret,
+ dme2Login, dme2Password,
+ consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit, useHttps);
+
+
+ String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+
+ if (environment == null || environment.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
+ } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
+ } if (latitude == null || latitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
+ } if (longitude == null || longitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
+ }
+
+ if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
+ throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " +
+ PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ }
+
+ String serviceName = servers.get(0);
+
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+
+ this.consumer.setUsername(dme2Login);
+ this.consumer.setPassword(dme2Password);
+
+ props = new Properties();
+
+ props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+
+ props.setProperty("username", dme2Login);
+ props.setProperty("password", dme2Password);
+
+ /* These are required, no defaults */
+ props.setProperty("topic", topic);
+
+ props.setProperty("Environment", environment);
+ props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+
+ if (dme2Partner != null)
+ props.setProperty("Partner", dme2Partner);
+ if (dme2RouteOffer != null)
+ props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+
+ props.setProperty("Latitude", latitude);
+ props.setProperty("Longitude", longitude);
+
+ /* These are optional, will default to these values if not set in additionalProps */
+ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
+ props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
+ props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
+ props.setProperty("Version", "1.0");
+ props.setProperty("SubContextPath", "/");
+ props.setProperty("sessionstickinessrequired", "no");
+
+ /* These should not change */
+ props.setProperty("TransportType", "DME2");
+ props.setProperty("MethodType", "GET");
+
+ if(useHttps){
+ props.setProperty("Protocol", "https");
+
+ }
+ else{
+ props.setProperty("Protocol", "http");
+ }
+
+ props.setProperty("contenttype", "application/json");
+
+ if (additionalProps != null) {
+ for(String key : additionalProps.keySet())
+ props.put(key, additionalProps.get(key));
+ }
+
+ MRClientFactory.prop = props;
+ this.consumer.setProps(props);
+
+ PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this);
+ }
+ }
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
index 798bf989..b5595b2d 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
@@ -24,14 +24,20 @@ import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.openecomp.policy.common.logging.eelf.PolicyLogger;
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+import org.slf4j.LoggerFactory;
+
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.att.nsa.cambria.client.CambriaClientBuilders;
import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import com.att.nsa.mr.client.response.MRPublisherResponse;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -65,15 +71,21 @@ public interface BusPublisher {
public CambriaPublisherWrapper(List<String> servers, String topic,
String apiKey,
- String apiSecret)
- throws IllegalArgumentException {
+ String apiSecret, boolean useHttps) throws IllegalArgumentException {
PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
-
- builder.usingHosts(servers)
+
+
+ if (useHttps){
+
+ builder.usingHosts(servers)
+ .onTopic(topic)
+ .usingHttps();
+ }
+ else{
+ builder.usingHosts(servers)
.onTopic(topic);
-
- // Only supported in 0.2.4 version
- // .logSendFailuresAfter(DEFAULT_LOG_SEND_FAILURES_AFTER);
+ }
+
if (apiKey != null && !apiKey.isEmpty() &&
apiSecret != null && !apiSecret.isEmpty()) {
@@ -142,43 +154,105 @@ public interface BusPublisher {
/**
* DmaapClient library wrapper
*/
- public static class DmaapPublisherWrapper implements BusPublisher {
+ public abstract class DmaapPublisherWrapper implements BusPublisher {
/**
* MR based Publisher
*/
protected MRSimplerBatchPublisher publisher;
+ protected Properties props;
- public DmaapPublisherWrapper(List<String> servers, String topic,
- String aafLogin,
- String aafPassword) {
+ /**
+ * MR Publisher Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param username AAF or DME2 Login
+ * @param password AAF or DME2 Password
+ */
+ public DmaapPublisherWrapper(ProtocolTypeConstants protocol,
+ List<String> servers, String topic,
+ String username,
+ String password, boolean useHttps) throws IllegalArgumentException {
+
- ArrayList<String> dmaapServers = new ArrayList<String>();
- for (String server: servers) {
- dmaapServers.add(server + ":3904");
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
}
-
- this.publisher =
- new MRSimplerBatchPublisher.Builder().
- againstUrls(dmaapServers).
- onTopic(topic).
- build();
- this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+ if (protocol == ProtocolTypeConstants.AAF_AUTH) {
+ if (servers == null || servers.isEmpty())
+ throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
+
+ ArrayList<String> dmaapServers = new ArrayList<String>();
+ if(useHttps){
+ for (String server: servers) {
+ dmaapServers.add(server + ":3905");
+ }
+
+ }
+ else{
+ for (String server: servers) {
+ dmaapServers.add(server + ":3904");
+ }
+ }
+
+
+ this.publisher =
+ new MRSimplerBatchPublisher.Builder().
+ againstUrls(dmaapServers).
+ onTopic(topic).
+ build();
+
+ this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+ } else if (protocol == ProtocolTypeConstants.DME2) {
+ ArrayList<String> dmaapServers = new ArrayList<String>();
+ dmaapServers.add("0.0.0.0:3904");
+
+ this.publisher =
+ new MRSimplerBatchPublisher.Builder().
+ againstUrls(dmaapServers).
+ onTopic(topic).
+ build();
+
+ this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+
+ }
+
+ this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
- this.publisher.setUsername(aafLogin);
- this.publisher.setPassword(aafPassword);
+ this.publisher.setUsername(username);
+ this.publisher.setPassword(password);
- Properties props = new Properties();
- props.setProperty("Protocol", "http");
+ props = new Properties();
+
+ if(useHttps){
+
+ props.setProperty("Protocol", "https");
+ }
+ else{
+
+ props.setProperty("Protocol", "http");
+
+ }
+
props.setProperty("contenttype", "application/json");
+ props.setProperty("username", username);
+ props.setProperty("password", password);
+
+ props.setProperty("topic", topic);
this.publisher.setProps(props);
- this.publisher.setHost(servers.get(0));
+ if (protocol == ProtocolTypeConstants.AAF_AUTH)
+ this.publisher.setHost(servers.get(0));
- if (PolicyLogger.isInfoEnabled())
+ if (PolicyLogger.isInfoEnabled()) {
PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
"CREATION: " + this);
+ PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
+ "BusPublisher.DmaapPublisherWrapper using Protocol: " + protocol.getValue());
+ }
+
}
/**
@@ -208,7 +282,16 @@ public interface BusPublisher {
if (message == null)
throw new IllegalArgumentException("No message provided");
+ this.publisher.setPubResponse(new MRPublisherResponse());
this.publisher.send(partitionId, message);
+ MRPublisherResponse response = this.publisher.sendBatchWithResponse();
+ if (PolicyLogger.isDebugEnabled() && response != null) {
+ PolicyLogger.debug(DmaapPublisherWrapper.class.getName(),
+ "DMaaP publisher received " + response.getResponseCode() + ": "
+ + response.getResponseMessage());
+
+ }
+
return true;
}
@@ -227,5 +310,97 @@ public interface BusPublisher {
return builder.toString();
}
}
+
+ /**
+ * DmaapClient library wrapper
+ */
+ public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
+ /**
+ * MR based Publisher
+ */
+ protected MRSimplerBatchPublisher publisher;
+
+ public DmaapAafPublisherWrapper(List<String> servers, String topic,
+ String aafLogin,
+ String aafPassword, boolean useHttps) {
+
+ super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
+ }
+ }
+
+ public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
+ public DmaapDmePublisherWrapper(List<String> servers, String topic,
+ String username, String password,
+ String environment, String aftEnvironment, String dme2Partner,
+ String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) {
+
+ super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
+
+
+
+
+
+
+ String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+
+ if (environment == null || environment.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
+ } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
+ } if (latitude == null || latitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
+ } if (longitude == null || longitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
+ }
+
+ if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
+ throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " +
+ PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ }
+
+ String serviceName = servers.get(0);
+
+ /* These are required, no defaults */
+ props.setProperty("Environment", environment);
+ props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+
+ props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+ if (dme2Partner != null)
+ props.setProperty("Partner", dme2Partner);
+ if (dme2RouteOffer != null)
+ props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+
+ props.setProperty("Latitude", latitude);
+ props.setProperty("Longitude", longitude);
+
+ // ServiceName also a default, found in additionalProps
+
+ /* These are optional, will default to these values if not set in optionalProps */
+ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
+ props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
+ props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
+ props.setProperty("Version", "1.0");
+ props.setProperty("SubContextPath", "/");
+ props.setProperty("sessionstickinessrequired", "no");
+
+ /* These should not change */
+ props.setProperty("TransportType", "DME2");
+ props.setProperty("MethodType", "POST");
+
+ for (String key : additionalProps.keySet()) {
+ String value = additionalProps.get(key);
+
+ if (value != null)
+ props.setProperty(key, value);
+ }
+
+ this.publisher.setProps(props);
+ }
+ }
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
index e36e3afc..4ac1c6fc 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
@@ -23,7 +23,6 @@ package org.openecomp.policy.drools.event.comm.bus.internal;
import java.util.List;
import org.apache.commons.collections4.queue.CircularFifoQueue;
-
import org.openecomp.policy.drools.event.comm.Topic;
import org.openecomp.policy.drools.event.comm.bus.BusTopic;
@@ -35,13 +34,30 @@ public abstract class BusTopicBase implements BusTopic, Topic {
protected String apiKey;
protected String apiSecret;
+ protected boolean useHttps;
+ protected boolean allowSelfSignedCerts;
protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<String>(10);
+ /**
+ * Instantiates a new Bus Topic Base
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
+ *
+ * @return a Bus Topic Base
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
public BusTopicBase(List<String> servers,
String topic,
String apiKey,
- String apiSecret)
+ String apiSecret,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
if (servers == null || servers.isEmpty()) {
@@ -57,6 +73,8 @@ public abstract class BusTopicBase implements BusTopic, Topic {
this.apiKey = apiKey;
this.apiSecret = apiSecret;
+ this.useHttps = useHttps;
+ this.allowSelfSignedCerts = allowSelfSignedCerts;
}
/**
@@ -92,6 +110,20 @@ public abstract class BusTopicBase implements BusTopic, Topic {
}
/**
+ * @return useHttps
+ */
+ public boolean isUseHttps(){
+ return useHttps;
+ }
+
+ /**
+ * @return allowSelfSignedCerts
+ */
+ public boolean isAllowSelfSignedCerts(){
+ return allowSelfSignedCerts;
+ }
+
+ /**
* @return the recentEvents
*/
@Override
@@ -104,8 +136,13 @@ public abstract class BusTopicBase implements BusTopic, Topic {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("UebTopicBase [servers=").append(servers).append(", topic=").append(topic).append(", apiKey=")
- .append(apiKey).append(", apiSecret=").append(apiSecret).append("]");
+ builder.append("UebTopicBase [servers=").append(servers)
+ .append(", topic=").append(topic)
+ .append(", apiKey=").append(apiKey)
+ .append(", apiSecret=").append(apiSecret)
+ .append(", useHttps=").append(useHttps)
+ .append(", allowSelfSignedCerts=").append(allowSelfSignedCerts)
+ .append("]");
return builder.toString();
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
index bd88818b..a78de716 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
@@ -83,13 +83,15 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
* @param apiKey api secret
* @param apiSecret api secret
* @param partitionId partition id
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
* @throws IllegalArgumentException in invalid parameters are passed in
*/
public InlineBusTopicSink(List<String> servers, String topic,
- String apiKey, String apiSecret, String partitionId)
+ String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts)
throws IllegalArgumentException {
- super(servers, topic, apiKey, apiSecret);
+ super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
if (partitionId == null || partitionId.isEmpty()) {
this.partitionId = UUID.randomUUID ().toString();
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
index 417c6d47..f5a3dc11 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
@@ -21,11 +21,12 @@
package org.openecomp.policy.drools.event.comm.bus.internal;
import java.util.List;
+import java.util.Map;
-import org.openecomp.policy.drools.event.comm.Topic;
-import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink;
import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.drools.event.comm.Topic;
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink;
/**
* This implementation publishes events for the associated DMAAP topic,
@@ -39,13 +40,68 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
protected final String userName;
protected final String password;
+ protected String environment = null;
+ protected String aftEnvironment = null;
+ protected String partner = null;
+ protected String latitude = null;
+ protected String longitude = null;
+
+ protected Map<String,String> additionalProps = null;
+
+ /**
+ *
+ * @param servers DMaaP servers
+ * @param topic DMaaP Topic to be monitored
+ * @param apiKey DMaaP API Key (optional)
+ * @param apiSecret DMaaP API Secret (optional)
+ * @param consumerGroup DMaaP Reader Consumer Group
+ * @param consumerInstance DMaaP Reader Instance
+ * @param fetchTimeout DMaaP fetch timeout
+ * @param fetchLimit DMaaP fetch limit
+ * @param environment DME2 Environment
+ * @param aftEnvironment DME2 AFT Environment
+ * @param partner DME2 Partner
+ * @param latitude DME2 Latitude
+ * @param longitude DME2 Longitude
+ * @param additionalProps Additional properties to pass to DME2
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
+ *
+ * @throws IllegalArgumentException An invalid parameter passed in
+ */
+ public InlineDmaapTopicSink(List<String> servers, String topic,
+ String apiKey, String apiSecret,
+ String userName, String password,
+ String partitionKey,
+ String environment, String aftEnvironment, String partner,
+ String latitude, String longitude, Map<String,String> additionalProps,
+ boolean useHttps, boolean allowSelfSignedCerts)
+ throws IllegalArgumentException {
+
+ super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
+
+ this.userName = userName;
+ this.password = password;
+
+ this.environment = environment;
+ this.aftEnvironment = aftEnvironment;
+ this.partner = partner;
+
+ this.latitude = latitude;
+ this.longitude = longitude;
+
+ this.additionalProps = additionalProps;
+
+ this.init();
+ }
+
public InlineDmaapTopicSink(List<String> servers, String topic,
String apiKey, String apiSecret,
String userName, String password,
- String partitionKey)
+ String partitionKey, boolean useHttps, boolean allowSelfSignedCerts)
throws IllegalArgumentException {
- super(servers, topic, apiKey, apiSecret, partitionKey);
+ super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
this.userName = userName;
this.password = password;
@@ -54,11 +110,24 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
@Override
public void init() {
- this.publisher =
- new BusPublisher.DmaapPublisherWrapper(this.servers,
+ if ((this.environment == null || this.environment.isEmpty()) &&
+ (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
+ (this.latitude == null || this.latitude.isEmpty()) &&
+ (this.longitude == null || this.longitude.isEmpty()) &&
+ (this.partner == null || this.partner.isEmpty())) {
+ this.publisher =
+ new BusPublisher.DmaapAafPublisherWrapper(this.servers,
this.topic,
this.userName,
- this.password);
+ this.password, this.useHttps);
+ } else {
+ this.publisher =
+ new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic,
+ this.userName, this.password,
+ this.environment, this.aftEnvironment,
+ this.partner, this.latitude, this.longitude,
+ this.additionalProps, this.useHttps);
+ }
if (logger.isInfoEnabled())
logger.info("DMAAP SINK TOPIC created " + this);
}
@@ -81,4 +150,4 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
return builder.toString();
}
-}
+} \ No newline at end of file
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
index 2d4b1552..c93e0f2b 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
@@ -46,6 +46,8 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
* @param apiKey the api key (optional)
* @param apiSecret the api secret (optional)
* @param partitionId the partition key (optional, autogenerated if not provided)
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
*
* @throws IllegalArgumentException if invalid arguments are detected
*/
@@ -53,9 +55,11 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
String topic,
String apiKey,
String apiSecret,
- String partitionId)
+ String partitionId,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
- super(servers, topic, apiKey, apiSecret, partitionId);
+ super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts);
}
/**
@@ -68,7 +72,8 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
new BusPublisher.CambriaPublisherWrapper(this.servers,
this.topic,
this.apiKey,
- this.apiSecret);
+ this.apiSecret,
+ this.useHttps);
if (logger.isInfoEnabled())
logger.info("UEB SINK TOPIC created " + this);
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index f37c349e..d3be9163 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -98,6 +98,7 @@ public abstract class SingleThreadedBusTopicSource
*/
protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>();
+
/**
*
* @param servers Bus servers
@@ -108,6 +109,8 @@ public abstract class SingleThreadedBusTopicSource
* @param consumerInstance Bus Reader Instance
* @param fetchTimeout Bus fetch timeout
* @param fetchLimit Bus fetch limit
+ * @param useHttps does the bus use https
+ * @param allowSelfSignedCerts are self-signed certificates allowed
* @throws IllegalArgumentException An invalid parameter passed in
*/
public SingleThreadedBusTopicSource(List<String> servers,
@@ -117,10 +120,12 @@ public abstract class SingleThreadedBusTopicSource
String consumerGroup,
String consumerInstance,
int fetchTimeout,
- int fetchLimit)
+ int fetchLimit,
+ boolean useHttps,
+ boolean allowSelfSignedCerts)
throws IllegalArgumentException {
- super(servers, topic, apiKey, apiSecret);
+ super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
if (consumerGroup == null || consumerGroup.isEmpty()) {
this.consumerGroup = UUID.randomUUID ().toString();
@@ -145,6 +150,7 @@ public abstract class SingleThreadedBusTopicSource
} else {
this.fetchLimit = fetchLimit;
}
+
}
/**
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
index e65d44a7..2ced5bcb 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
@@ -21,10 +21,11 @@
package org.openecomp.policy.drools.event.comm.bus.internal;
import java.util.List;
+import java.util.Map;
+import org.openecomp.policy.common.logging.eelf.PolicyLogger;
import org.openecomp.policy.drools.event.comm.Topic;
import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource;
-import org.openecomp.policy.common.logging.eelf.PolicyLogger;
/**
* This topic reader implementation specializes in reading messages
@@ -33,10 +34,21 @@ import org.openecomp.policy.common.logging.eelf.PolicyLogger;
public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
implements DmaapTopicSource, Runnable {
+
+ protected boolean allowSelfSignedCerts;
protected final String userName;
protected final String password;
private String className = SingleThreadedDmaapTopicSource.class.getName();
+
+ protected String environment = null;
+ protected String aftEnvironment = null;
+ protected String partner = null;
+ protected String latitude = null;
+ protected String longitude = null;
+
+ protected Map<String,String> additionalProps = null;
+
/**
*
* @param servers DMaaP servers
@@ -47,19 +59,75 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
* @param consumerInstance DMaaP Reader Instance
* @param fetchTimeout DMaaP fetch timeout
* @param fetchLimit DMaaP fetch limit
+ * @param environment DME2 Environment
+ * @param aftEnvironment DME2 AFT Environment
+ * @param partner DME2 Partner
+ * @param latitude DME2 Latitude
+ * @param longitude DME2 Longitude
+ * @param additionalProps Additional properties to pass to DME2
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
+ *
+ * @throws IllegalArgumentException An invalid parameter passed in
+ */
+ public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
+ String apiKey, String apiSecret,
+ String userName, String password,
+ String consumerGroup, String consumerInstance,
+ int fetchTimeout, int fetchLimit,
+ String environment, String aftEnvironment, String partner,
+ String latitude, String longitude, Map<String,String> additionalProps,
+ boolean useHttps, boolean allowSelfSignedCerts)
+ throws IllegalArgumentException {
+
+ super(servers, topic, apiKey, apiSecret,
+ consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
+
+ this.userName = userName;
+ this.password = password;
+
+ this.environment = environment;
+ this.aftEnvironment = aftEnvironment;
+ this.partner = partner;
+
+ this.latitude = latitude;
+ this.longitude = longitude;
+
+ this.additionalProps = additionalProps;
+ try {
+ this.init();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ *
+ * @param servers DMaaP servers
+ * @param topic DMaaP Topic to be monitored
+ * @param apiKey DMaaP API Key (optional)
+ * @param apiSecret DMaaP API Secret (optional)
+ * @param consumerGroup DMaaP Reader Consumer Group
+ * @param consumerInstance DMaaP Reader Instance
+ * @param fetchTimeout DMaaP fetch timeout
+ * @param fetchLimit DMaaP fetch limit
+ * @param useHttps does connection use HTTPS?
+ * @param allowSelfSignedCerts are self-signed certificates allow
* @throws IllegalArgumentException An invalid parameter passed in
*/
public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
String apiKey, String apiSecret,
String userName, String password,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit)
+ int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
throws IllegalArgumentException {
super(servers, topic, apiKey, apiSecret,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
this.userName = userName;
this.password = password;
@@ -78,22 +146,35 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
*/
@Override
public void init() throws Exception {
-
if (this.userName == null || this.userName.isEmpty() ||
- this.password == null || this.password.isEmpty()) {
- this.consumer =
- new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
- this.apiKey, this.apiSecret,
- this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit);
- } else {
- this.consumer =
- new BusConsumer.DmaapConsumerWrapper(this.servers, this.topic,
- this.apiKey, this.apiSecret,
- this.userName, this.password,
- this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit);
- }
+ this.password == null || this.password.isEmpty()) {
+ this.consumer =
+ new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
+ this.apiKey, this.apiSecret,
+ this.consumerGroup, this.consumerInstance,
+ this.fetchTimeout, this.fetchLimit,
+ this.useHttps, this.allowSelfSignedCerts);
+ } else if ((this.environment == null || this.environment.isEmpty()) &&
+ (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
+ (this.latitude == null || this.latitude.isEmpty()) &&
+ (this.longitude == null || this.longitude.isEmpty()) &&
+ (this.partner == null || this.partner.isEmpty())) {
+ this.consumer =
+ new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic,
+ this.apiKey, this.apiSecret,
+ this.userName, this.password,
+ this.consumerGroup, this.consumerInstance,
+ this.fetchTimeout, this.fetchLimit, this.useHttps);
+ } else {
+ this.consumer =
+ new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic,
+ this.apiKey, this.apiSecret,
+ this.userName, this.password,
+ this.consumerGroup, this.consumerInstance,
+ this.fetchTimeout, this.fetchLimit,
+ this.environment, this.aftEnvironment, this.partner,
+ this.latitude, this.longitude, this.additionalProps, this.useHttps);
+ }
PolicyLogger.info(className, "CREATION: " + this);
}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
index edb55c75..641eddaa 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
@@ -42,18 +42,24 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource
* @param consumerInstance UEB Reader Instance
* @param fetchTimeout UEB fetch timeout
* @param fetchLimit UEB fetch limit
+ * @param useHttps does topicSource use HTTPS?
+ * @param allowSelfSignedCerts does topicSource allow self-signed certs?
+ *
* @throws IllegalArgumentException An invalid parameter passed in
*/
+
+
public SingleThreadedUebTopicSource(List<String> servers, String topic,
String apiKey, String apiSecret,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit)
+ int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
throws IllegalArgumentException {
super(servers, topic, apiKey, apiSecret,
consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
+ this.allowSelfSignedCerts = allowSelfSignedCerts;
this.init();
}
@@ -67,7 +73,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource
new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
this.apiKey, this.apiSecret,
this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit);
+ this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
}
/**
@@ -77,6 +83,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource
public CommInfrastructure getTopicCommInfrastructure() {
return Topic.CommInfrastructure.UEB;
}
+
@Override
public String toString() {